diem_logger/
diem_logger.rs

1// Copyright (c) The Diem Core Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4// Copyright 2021 Conflux Foundation. All rights reserved.
5// Conflux is free software and distributed under GNU General Public License.
6// See http://www.gnu.org/licenses/
7
8//! Implementation of writing logs to both local printers (e.g. stdout) and
9//! remote loggers (e.g. Logstash)
10
11use crate::{
12    counters::{
13        PROCESSED_STRUCT_LOG_COUNT, SENT_STRUCT_LOG_BYTES,
14        SENT_STRUCT_LOG_COUNT, STRUCT_LOG_PARSE_ERROR_COUNT,
15        STRUCT_LOG_QUEUE_ERROR_COUNT, STRUCT_LOG_SEND_ERROR_COUNT,
16    },
17    logger::Logger,
18    struct_log::TcpWriter,
19    Event, Filter, Level, LevelFilter, Metadata,
20};
21use backtrace::Backtrace;
22use chrono::{SecondsFormat, Utc};
23use diem_infallible::RwLock;
24use once_cell::sync::Lazy;
25use parking_lot::Mutex;
26use pipe_logger_lib::{PipeLogger, PipeLoggerBuilder, RotateMethod};
27use serde::Serialize;
28use std::{
29    collections::BTreeMap,
30    env, fmt,
31    io::Write,
32    sync::{
33        mpsc::{self, Receiver, SyncSender},
34        Arc,
35    },
36    thread,
37};
38
39const RUST_LOG: &str = "RUST_LOG";
40/// Default size of log write channel, if the channel is full, logs will be
41/// dropped
42pub const CHANNEL_SIZE: usize = 10000;
43const NUM_SEND_RETRIES: u8 = 1;
44
45/// A single log entry emitted by a logging macro with associated metadata
46#[derive(Debug, Serialize)]
47pub struct LogEntry {
48    #[serde(flatten)]
49    metadata: Metadata,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    thread_name: Option<String>,
52    /// The program backtrace taken when the event occurred. Backtraces are
53    /// only supported for errors.
54    #[serde(skip_serializing_if = "Option::is_none")]
55    backtrace: Option<String>,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    hostname: Option<&'static str>,
58    timestamp: String,
59    #[serde(skip_serializing_if = "BTreeMap::is_empty")]
60    data: BTreeMap<&'static str, serde_json::Value>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    message: Option<String>,
63}
64
65impl LogEntry {
66    fn new(event: &Event, thread_name: Option<&str>) -> Self {
67        use crate::{Key, Value, Visitor};
68
69        struct JsonVisitor<'a>(
70            &'a mut BTreeMap<&'static str, serde_json::Value>,
71        );
72
73        impl<'a> Visitor for JsonVisitor<'a> {
74            fn visit_pair(&mut self, key: Key, value: Value<'_>) {
75                let v = match value {
76                    Value::Debug(d) => {
77                        serde_json::Value::String(format!("{:?}", d))
78                    }
79                    Value::Display(d) => {
80                        serde_json::Value::String(d.to_string())
81                    }
82                    Value::Serde(s) => match serde_json::to_value(s) {
83                        Ok(value) => value,
84                        Err(e) => {
85                            eprintln!(
86                                "error serializing structured log: {}",
87                                e
88                            );
89                            return;
90                        }
91                    },
92                };
93
94                self.0.insert(key.as_str(), v);
95            }
96        }
97
98        let metadata = *event.metadata();
99        let thread_name = thread_name.map(ToOwned::to_owned);
100        let message = event.message().map(fmt::format);
101
102        static HOSTNAME: Lazy<Option<String>> = Lazy::new(|| {
103            hostname::get()
104                .ok()
105                .and_then(|name| name.into_string().ok())
106        });
107
108        let hostname = HOSTNAME.as_deref();
109
110        let backtrace = match metadata.level() {
111            Level::Error => {
112                let mut backtrace = Backtrace::new();
113                let mut frames = backtrace.frames().to_vec();
114                if frames.len() > 3 {
115                    frames.drain(0..3); // Remove the first 3 unnecessary frames
116                                        // to simplify
117                                        // backtrace
118                }
119                backtrace = frames.into();
120                Some(format!("{:?}", backtrace))
121            }
122            _ => None,
123        };
124
125        let mut data = BTreeMap::new();
126        for schema in event.keys_and_values() {
127            schema.visit(&mut JsonVisitor(&mut data));
128        }
129
130        Self {
131            metadata,
132            thread_name,
133            backtrace,
134            hostname,
135            timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Micros, true),
136            data,
137            message,
138        }
139    }
140
141    pub fn metadata(&self) -> &Metadata { &self.metadata }
142
143    pub fn thread_name(&self) -> Option<&str> { self.thread_name.as_deref() }
144
145    pub fn backtrace(&self) -> Option<&str> { self.backtrace.as_deref() }
146
147    pub fn hostname(&self) -> Option<&str> { self.hostname.as_deref() }
148
149    pub fn timestamp(&self) -> &str { self.timestamp.as_str() }
150
151    pub fn data(&self) -> &BTreeMap<&'static str, serde_json::Value> {
152        &self.data
153    }
154
155    pub fn message(&self) -> Option<&str> { self.message.as_deref() }
156}
157
158/// A builder for a `DiemLogger`, configures what, where, and how to write logs.
159pub struct DiemLoggerBuilder {
160    channel_size: usize,
161    level: Level,
162    remote_level: Level,
163    address: Option<String>,
164    printer: Option<Box<dyn Writer>>,
165    is_async: bool,
166    custom_format: Option<fn(&LogEntry) -> Result<String, fmt::Error>>,
167}
168
169impl DiemLoggerBuilder {
170    #[allow(clippy::new_without_default)]
171    pub fn new() -> Self {
172        Self {
173            channel_size: CHANNEL_SIZE,
174            level: Level::Info,
175            remote_level: Level::Debug,
176            address: None,
177            printer: Some(Box::new(StderrWriter)),
178            is_async: false,
179            custom_format: None,
180        }
181    }
182
183    pub fn address(&mut self, address: String) -> &mut Self {
184        self.address = Some(address);
185        self
186    }
187
188    pub fn read_env(&mut self) -> &mut Self {
189        if let Ok(address) = env::var("STRUCT_LOG_TCP_ADDR") {
190            self.address(address);
191        }
192        self
193    }
194
195    pub fn level(&mut self, level: Level) -> &mut Self {
196        self.level = level;
197        self
198    }
199
200    pub fn remote_level(&mut self, level: Level) -> &mut Self {
201        self.remote_level = level;
202        self
203    }
204
205    pub fn channel_size(&mut self, channel_size: usize) -> &mut Self {
206        self.channel_size = channel_size;
207        self
208    }
209
210    pub fn printer(
211        &mut self, printer: Box<dyn Writer + Send + Sync + 'static>,
212    ) -> &mut Self {
213        self.printer = Some(printer);
214        self
215    }
216
217    pub fn is_async(&mut self, is_async: bool) -> &mut Self {
218        self.is_async = is_async;
219        self
220    }
221
222    pub fn custom_format(
223        &mut self, format: fn(&LogEntry) -> Result<String, fmt::Error>,
224    ) -> &mut Self {
225        self.custom_format = Some(format);
226        self
227    }
228
229    pub fn init(&mut self) { self.build(); }
230
231    pub fn build(&mut self) -> Arc<DiemLogger> {
232        let filter = {
233            let local_filter = {
234                let mut filter_builder = Filter::builder();
235
236                if env::var(RUST_LOG).is_ok() {
237                    filter_builder.with_env(RUST_LOG);
238                } else {
239                    filter_builder.filter_level(self.level.into());
240                }
241
242                filter_builder.build()
243            };
244            let remote_filter = {
245                let mut filter_builder = Filter::builder();
246
247                if self.is_async && self.address.is_some() {
248                    filter_builder.filter_level(self.remote_level.into());
249                } else {
250                    filter_builder.filter_level(LevelFilter::Off);
251                }
252
253                filter_builder.build()
254            };
255
256            DiemFilter {
257                local_filter,
258                remote_filter,
259            }
260        };
261
262        let logger = if self.is_async {
263            let (sender, receiver) = mpsc::sync_channel(self.channel_size);
264            let logger = Arc::new(DiemLogger {
265                sender: Some(sender),
266                printer: None,
267                filter: RwLock::new(filter),
268                formatter: self.custom_format.take().unwrap_or(default_format),
269            });
270            let service = LoggerService {
271                receiver,
272                address: self.address.clone(),
273                printer: self.printer.take(),
274                facade: logger.clone(),
275            };
276
277            thread::spawn(move || service.run());
278            logger
279        } else {
280            Arc::new(DiemLogger {
281                sender: None,
282                printer: self.printer.take(),
283                filter: RwLock::new(filter),
284                formatter: self.custom_format.take().unwrap_or(default_format),
285            })
286        };
287
288        crate::logger::set_global_logger(logger.clone());
289        logger
290    }
291}
292
293/// A combination of `Filter`s to control where logs are written
294struct DiemFilter {
295    /// The local printer `Filter` to control what is logged in text output
296    local_filter: Filter,
297    /// The remote logging `Filter` to control what is sent to external logging
298    remote_filter: Filter,
299}
300
301impl DiemFilter {
302    fn enabled(&self, metadata: &Metadata) -> bool {
303        self.local_filter.enabled(metadata)
304            || self.remote_filter.enabled(metadata)
305    }
306}
307
308pub struct DiemLogger {
309    sender: Option<SyncSender<LoggerServiceEvent>>,
310    printer: Option<Box<dyn Writer>>,
311    filter: RwLock<DiemFilter>,
312    pub(crate) formatter: fn(&LogEntry) -> Result<String, fmt::Error>,
313}
314
315impl DiemLogger {
316    pub fn builder() -> DiemLoggerBuilder { DiemLoggerBuilder::new() }
317
318    #[allow(clippy::new_ret_no_self)]
319    pub fn new() -> DiemLoggerBuilder { Self::builder() }
320
321    pub fn init_for_testing() {
322        if env::var(RUST_LOG).is_err() {
323            return;
324        }
325
326        Self::builder()
327            .is_async(false)
328            .printer(Box::new(StderrWriter))
329            .build();
330    }
331
332    pub fn set_filter(&self, filter: Filter) {
333        self.filter.write().local_filter = filter;
334    }
335
336    pub fn set_remote_filter(&self, filter: Filter) {
337        self.filter.write().remote_filter = filter;
338    }
339
340    fn send_entry(&self, entry: LogEntry) {
341        if let Some(printer) = &self.printer {
342            let s = (self.formatter)(&entry).expect("Unable to format");
343            printer.write(s);
344        }
345
346        if let Some(sender) = &self.sender {
347            if let Err(e) = sender.try_send(LoggerServiceEvent::LogEntry(entry))
348            {
349                STRUCT_LOG_QUEUE_ERROR_COUNT.inc();
350                eprintln!("Failed to send structured log: {}", e);
351            }
352        }
353    }
354}
355
356impl Logger for DiemLogger {
357    fn enabled(&self, metadata: &Metadata) -> bool {
358        self.filter.read().enabled(metadata)
359    }
360
361    fn record(&self, event: &Event) {
362        let entry = LogEntry::new(event, ::std::thread::current().name());
363
364        self.send_entry(entry)
365    }
366
367    fn flush(&self) {
368        if let Some(sender) = &self.sender {
369            let (oneshot_sender, oneshot_receiver) = mpsc::sync_channel(1);
370            sender
371                .send(LoggerServiceEvent::Flush(oneshot_sender))
372                .unwrap();
373            oneshot_receiver.recv().unwrap();
374        }
375    }
376}
377
378enum LoggerServiceEvent {
379    LogEntry(LogEntry),
380    Flush(SyncSender<()>),
381}
382
383/// A service for running a log listener, that will continually export logs
384/// through a local printer or to a `DiemLogger` for external logging.
385struct LoggerService {
386    receiver: Receiver<LoggerServiceEvent>,
387    address: Option<String>,
388    printer: Option<Box<dyn Writer>>,
389    facade: Arc<DiemLogger>,
390}
391
392impl LoggerService {
393    pub fn run(mut self) {
394        let mut writer = self.address.take().map(TcpWriter::new);
395
396        for event in self.receiver {
397            match event {
398                LoggerServiceEvent::LogEntry(entry) => {
399                    PROCESSED_STRUCT_LOG_COUNT.inc();
400
401                    if let Some(printer) = &self.printer {
402                        if self
403                            .facade
404                            .filter
405                            .read()
406                            .local_filter
407                            .enabled(&entry.metadata)
408                        {
409                            let s = (self.facade.formatter)(&entry)
410                                .expect("Unable to format");
411                            printer.write(s)
412                        }
413                    }
414
415                    if let Some(writer) = &mut writer {
416                        if self
417                            .facade
418                            .filter
419                            .read()
420                            .remote_filter
421                            .enabled(&entry.metadata)
422                        {
423                            Self::write_to_logstash(writer, entry);
424                        }
425                    }
426                }
427                LoggerServiceEvent::Flush(sender) => {
428                    // This is just to notify the other side, the logger doesn't
429                    // actually care if the listener is
430                    // still listening
431                    let _ = sender.send(());
432                }
433            }
434        }
435    }
436
437    /// Writes a log line into json_lines logstash format, which has a newline
438    /// at the end
439    fn write_to_logstash(stream: &mut TcpWriter, mut entry: LogEntry) {
440        // XXX Temporary hack to ensure that log lines don't show up empty in
441        // kibana when the "message" field isn't set.
442        if entry.message.is_none() {
443            entry.message = Some(serde_json::to_string(&entry.data).unwrap());
444        }
445
446        let message = if let Ok(json) = serde_json::to_string(&entry) {
447            json
448        } else {
449            STRUCT_LOG_PARSE_ERROR_COUNT.inc();
450            return;
451        };
452
453        let message = message + "\n";
454        let bytes = message.as_bytes();
455        let message_length = bytes.len();
456
457        // Attempt to write the log up to NUM_SEND_RETRIES + 1, and then drop it
458        // Each `write_all` call will attempt to open a connection if one isn't
459        // open
460        let mut result = stream.write_all(bytes);
461        for _ in 0..NUM_SEND_RETRIES {
462            if result.is_ok() {
463                break;
464            } else {
465                result = stream.write_all(bytes);
466            }
467        }
468
469        if let Err(e) = result {
470            STRUCT_LOG_SEND_ERROR_COUNT.inc();
471            eprintln!(
472                "[Logging] Error while sending data to logstash({}): {}",
473                stream.endpoint(),
474                e
475            );
476        } else {
477            SENT_STRUCT_LOG_COUNT.inc();
478            SENT_STRUCT_LOG_BYTES.inc_by(message_length as u64);
479        }
480    }
481}
482
483/// An trait encapsulating the operations required for writing logs.
484pub trait Writer: Send + Sync {
485    /// Write the log.
486    fn write(&self, log: String);
487}
488
489/// A struct for writing logs to stderr
490struct StderrWriter;
491
492impl Writer for StderrWriter {
493    /// Write log to stderr
494    fn write(&self, log: String) {
495        eprintln!("{}", log);
496    }
497}
498
499/// A struct for writing logs to a file
500pub struct FileWriter {
501    log_file: RwLock<std::fs::File>,
502}
503
504impl FileWriter {
505    pub fn new(log_file: std::path::PathBuf) -> Self {
506        let file = std::fs::OpenOptions::new()
507            .append(true)
508            .create(true)
509            .open(log_file)
510            .expect("Unable to open log file");
511        Self {
512            log_file: RwLock::new(file),
513        }
514    }
515}
516
517impl Writer for FileWriter {
518    /// Write to file
519    fn write(&self, log: String) {
520        if let Err(err) = writeln!(self.log_file.write(), "{}", log) {
521            eprintln!("Unable to write to log file: {}", err.to_string());
522        }
523    }
524}
525
526pub struct RollingFileWriter {
527    log_file: Mutex<PipeLogger>,
528}
529
530impl RollingFileWriter {
531    pub fn new(
532        log_path: std::path::PathBuf, count: usize, size_mb: usize,
533    ) -> Self {
534        let mut builder = PipeLoggerBuilder::new(log_path);
535        builder
536            .set_compress(true)
537            .set_rotate(Some(RotateMethod::FileSize(
538                size_mb as u64 * 1_000_000,
539            )))
540            .set_count(Some(count));
541        let log_file = builder.build().unwrap();
542        Self {
543            log_file: Mutex::new(log_file),
544        }
545    }
546}
547
548impl Writer for RollingFileWriter {
549    fn write(&self, log: String) {
550        if let Err(e) = self.log_file.lock().write_line(&log) {
551            eprintln!("Unable to write to log file: {}", e.to_string());
552        }
553    }
554}
555
556/// Converts a record into a string representation:
557/// UNIX_TIMESTAMP LOG_LEVEL [thread_name] FILE:LINE MESSAGE JSON_DATA
558/// Example:
559/// 2020-03-07 05:03:03 INFO [thread_name] common/diem-logger/src/lib.rs:261
560/// Hello { "world": true }
561fn default_format(entry: &LogEntry) -> Result<String, fmt::Error> {
562    use std::fmt::Write;
563
564    let mut w = String::new();
565    write!(w, "{}", entry.timestamp)?;
566
567    if let Some(thread_name) = &entry.thread_name {
568        write!(w, " [{}]", thread_name)?;
569    }
570
571    write!(
572        w,
573        " {} {}",
574        entry.metadata.level(),
575        entry.metadata.location()
576    )?;
577
578    if let Some(message) = &entry.message {
579        write!(w, " {}", message)?;
580    }
581
582    if !entry.data.is_empty() {
583        write!(w, " {}", serde_json::to_string(&entry.data).unwrap())?;
584    }
585
586    Ok(w)
587}
588
589#[cfg(test)]
590mod tests {
591    use super::LogEntry;
592    use crate::{
593        debug, error, info, logger::Logger, trace, warn, Event, Key, KeyValue,
594        Level, Metadata, Schema, Value, Visitor,
595    };
596    use chrono::{DateTime, Utc};
597    use serde_json::Value as JsonValue;
598    use std::{
599        sync::{
600            mpsc::{self, Receiver, SyncSender},
601            Arc,
602        },
603        thread,
604    };
605
606    #[derive(serde::Serialize)]
607    #[serde(rename_all = "snake_case")]
608    enum Enum {
609        FooBar,
610    }
611
612    struct TestSchema<'a> {
613        foo: usize,
614        bar: &'a Enum,
615    }
616
617    impl Schema for TestSchema<'_> {
618        fn visit(&self, visitor: &mut dyn Visitor) {
619            visitor.visit_pair(Key::new("foo"), Value::from_serde(&self.foo));
620            visitor.visit_pair(Key::new("bar"), Value::from_serde(&self.bar));
621        }
622    }
623
624    struct LogStream(SyncSender<LogEntry>);
625
626    impl LogStream {
627        fn new() -> (Self, Receiver<LogEntry>) {
628            let (sender, receiver) = mpsc::sync_channel(1024);
629            (Self(sender), receiver)
630        }
631    }
632
633    impl Logger for LogStream {
634        fn enabled(&self, metadata: &Metadata) -> bool {
635            metadata.level() <= Level::Debug
636        }
637
638        fn record(&self, event: &Event) {
639            let entry = LogEntry::new(event, ::std::thread::current().name());
640            self.0.send(entry).unwrap();
641        }
642
643        fn flush(&self) {}
644    }
645
646    fn set_test_logger() -> Receiver<LogEntry> {
647        let (logger, receiver) = LogStream::new();
648        let logger = Arc::new(logger);
649        crate::logger::set_global_logger(logger);
650        receiver
651    }
652
653    // TODO: Find a better mechanism for testing that allows setting the logger
654    // not globally
655    #[test]
656    fn basic() {
657        let receiver = set_test_logger();
658        let number = 12345;
659
660        // Send an info log
661        let before = Utc::now();
662        info!(
663            TestSchema {
664                foo: 5,
665                bar: &Enum::FooBar
666            },
667            test = true,
668            category = "name",
669            KeyValue::new("display", Value::from_display(&number)),
670            "This is a log"
671        );
672        let after = Utc::now();
673
674        let entry = receiver.recv().unwrap();
675
676        // Ensure standard fields are filled
677        assert_eq!(entry.metadata.level(), Level::Info);
678        assert_eq!(
679            entry.metadata.target(),
680            module_path!().split("::").next().unwrap()
681        );
682        assert_eq!(entry.metadata.module_path(), module_path!());
683        assert_eq!(entry.metadata.file(), file!());
684        assert_eq!(entry.message.as_deref(), Some("This is a log"));
685        assert!(entry.backtrace.is_none());
686
687        // Log time should be the time the structured log entry was created
688        let timestamp = DateTime::parse_from_rfc3339(&entry.timestamp).unwrap();
689        let timestamp: DateTime<Utc> = DateTime::from(timestamp);
690        assert!(before <= timestamp && timestamp <= after);
691
692        // Ensure data stored is the right type
693        assert_eq!(entry.data.get("foo").and_then(JsonValue::as_u64), Some(5));
694        assert_eq!(
695            entry.data.get("bar").and_then(JsonValue::as_str),
696            Some("foo_bar")
697        );
698        assert_eq!(
699            entry.data.get("display").and_then(JsonValue::as_str),
700            Some(format!("{}", number)).as_deref(),
701        );
702        assert_eq!(
703            entry.data.get("test").and_then(JsonValue::as_bool),
704            Some(true),
705        );
706        assert_eq!(
707            entry.data.get("category").and_then(JsonValue::as_str),
708            Some("name"),
709        );
710
711        // Test error logs contain backtraces
712        error!("This is an error log");
713        let entry = receiver.recv().unwrap();
714        assert!(entry.backtrace.is_some());
715
716        // Test all log levels work properly
717        // Tracing should be skipped because the Logger was setup to skip
718        // Tracing events
719        trace!("trace");
720        debug!("debug");
721        info!("info");
722        warn!("warn");
723        error!("error");
724
725        let levels = &[Level::Debug, Level::Info, Level::Warn, Level::Error];
726
727        for level in levels {
728            let entry = receiver.recv().unwrap();
729            assert_eq!(entry.metadata.level(), *level);
730        }
731
732        // Verify that the thread name is properly included
733        let handler = thread::Builder::new()
734            .name("named thread".into())
735            .spawn(|| info!("thread"))
736            .unwrap();
737
738        handler.join().unwrap();
739        let entry = receiver.recv().unwrap();
740        assert_eq!(entry.thread_name.as_deref(), Some("named thread"));
741
742        // Test Debug and Display inputs
743        let debug_struct = DebugStruct {};
744        let display_struct = DisplayStruct {};
745
746        error!(identifier = ?debug_struct, "Debug test");
747        error!(identifier = ?debug_struct, other = "value", "Debug2 test");
748        error!(identifier = %display_struct, "Display test");
749        error!(identifier = %display_struct, other = "value", "Display2 test");
750        error!("Literal" = ?debug_struct, "Debug test");
751        error!("Literal" = ?debug_struct, other = "value", "Debug test");
752        error!("Literal" = %display_struct, "Display test");
753        error!("Literal" = %display_struct, other = "value", "Display2 test");
754        error!("Literal" = %display_struct, other = "value", identifier = ?debug_struct, "Mixed test");
755    }
756
757    struct DebugStruct {}
758
759    impl std::fmt::Debug for DebugStruct {
760        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761            write!(f, "DebugStruct!")
762        }
763    }
764
765    struct DisplayStruct {}
766
767    impl std::fmt::Display for DisplayStruct {
768        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
769            write!(f, "DisplayStruct!")
770        }
771    }
772}