1use crate::{
12 counters::{
13 PROCESSED_STRUCT_LOG_COUNT, SENT_STRUCT_LOG_BYTES,
14 SENT_STRUCT_LOG_COUNT, STRUCT_LOG_PARSE_ERROR_COUNT,
15 STRUCT_LOG_QUEUE_ERROR_COUNT, STRUCT_LOG_SEND_ERROR_COUNT,
16 },
17 logger::Logger,
18 struct_log::TcpWriter,
19 Event, Filter, Level, LevelFilter, Metadata,
20};
21use backtrace::Backtrace;
22use chrono::{SecondsFormat, Utc};
23use diem_infallible::RwLock;
24use once_cell::sync::Lazy;
25use parking_lot::Mutex;
26use pipe_logger_lib::{PipeLogger, PipeLoggerBuilder, RotateMethod};
27use serde::Serialize;
28use std::{
29 collections::BTreeMap,
30 env, fmt,
31 io::Write,
32 sync::{
33 mpsc::{self, Receiver, SyncSender},
34 Arc,
35 },
36 thread,
37};
38
39const RUST_LOG: &str = "RUST_LOG";
40pub const CHANNEL_SIZE: usize = 10000;
43const NUM_SEND_RETRIES: u8 = 1;
44
45#[derive(Debug, Serialize)]
47pub struct LogEntry {
48 #[serde(flatten)]
49 metadata: Metadata,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 thread_name: Option<String>,
52 #[serde(skip_serializing_if = "Option::is_none")]
55 backtrace: Option<String>,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 hostname: Option<&'static str>,
58 timestamp: String,
59 #[serde(skip_serializing_if = "BTreeMap::is_empty")]
60 data: BTreeMap<&'static str, serde_json::Value>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 message: Option<String>,
63}
64
65impl LogEntry {
66 fn new(event: &Event, thread_name: Option<&str>) -> Self {
67 use crate::{Key, Value, Visitor};
68
69 struct JsonVisitor<'a>(
70 &'a mut BTreeMap<&'static str, serde_json::Value>,
71 );
72
73 impl<'a> Visitor for JsonVisitor<'a> {
74 fn visit_pair(&mut self, key: Key, value: Value<'_>) {
75 let v = match value {
76 Value::Debug(d) => {
77 serde_json::Value::String(format!("{:?}", d))
78 }
79 Value::Display(d) => {
80 serde_json::Value::String(d.to_string())
81 }
82 Value::Serde(s) => match serde_json::to_value(s) {
83 Ok(value) => value,
84 Err(e) => {
85 eprintln!(
86 "error serializing structured log: {}",
87 e
88 );
89 return;
90 }
91 },
92 };
93
94 self.0.insert(key.as_str(), v);
95 }
96 }
97
98 let metadata = *event.metadata();
99 let thread_name = thread_name.map(ToOwned::to_owned);
100 let message = event.message().map(fmt::format);
101
102 static HOSTNAME: Lazy<Option<String>> = Lazy::new(|| {
103 hostname::get()
104 .ok()
105 .and_then(|name| name.into_string().ok())
106 });
107
108 let hostname = HOSTNAME.as_deref();
109
110 let backtrace = match metadata.level() {
111 Level::Error => {
112 let mut backtrace = Backtrace::new();
113 let mut frames = backtrace.frames().to_vec();
114 if frames.len() > 3 {
115 frames.drain(0..3); }
119 backtrace = frames.into();
120 Some(format!("{:?}", backtrace))
121 }
122 _ => None,
123 };
124
125 let mut data = BTreeMap::new();
126 for schema in event.keys_and_values() {
127 schema.visit(&mut JsonVisitor(&mut data));
128 }
129
130 Self {
131 metadata,
132 thread_name,
133 backtrace,
134 hostname,
135 timestamp: Utc::now().to_rfc3339_opts(SecondsFormat::Micros, true),
136 data,
137 message,
138 }
139 }
140
141 pub fn metadata(&self) -> &Metadata { &self.metadata }
142
143 pub fn thread_name(&self) -> Option<&str> { self.thread_name.as_deref() }
144
145 pub fn backtrace(&self) -> Option<&str> { self.backtrace.as_deref() }
146
147 pub fn hostname(&self) -> Option<&str> { self.hostname.as_deref() }
148
149 pub fn timestamp(&self) -> &str { self.timestamp.as_str() }
150
151 pub fn data(&self) -> &BTreeMap<&'static str, serde_json::Value> {
152 &self.data
153 }
154
155 pub fn message(&self) -> Option<&str> { self.message.as_deref() }
156}
157
158pub struct DiemLoggerBuilder {
160 channel_size: usize,
161 level: Level,
162 remote_level: Level,
163 address: Option<String>,
164 printer: Option<Box<dyn Writer>>,
165 is_async: bool,
166 custom_format: Option<fn(&LogEntry) -> Result<String, fmt::Error>>,
167}
168
169impl DiemLoggerBuilder {
170 #[allow(clippy::new_without_default)]
171 pub fn new() -> Self {
172 Self {
173 channel_size: CHANNEL_SIZE,
174 level: Level::Info,
175 remote_level: Level::Debug,
176 address: None,
177 printer: Some(Box::new(StderrWriter)),
178 is_async: false,
179 custom_format: None,
180 }
181 }
182
183 pub fn address(&mut self, address: String) -> &mut Self {
184 self.address = Some(address);
185 self
186 }
187
188 pub fn read_env(&mut self) -> &mut Self {
189 if let Ok(address) = env::var("STRUCT_LOG_TCP_ADDR") {
190 self.address(address);
191 }
192 self
193 }
194
195 pub fn level(&mut self, level: Level) -> &mut Self {
196 self.level = level;
197 self
198 }
199
200 pub fn remote_level(&mut self, level: Level) -> &mut Self {
201 self.remote_level = level;
202 self
203 }
204
205 pub fn channel_size(&mut self, channel_size: usize) -> &mut Self {
206 self.channel_size = channel_size;
207 self
208 }
209
210 pub fn printer(
211 &mut self, printer: Box<dyn Writer + Send + Sync + 'static>,
212 ) -> &mut Self {
213 self.printer = Some(printer);
214 self
215 }
216
217 pub fn is_async(&mut self, is_async: bool) -> &mut Self {
218 self.is_async = is_async;
219 self
220 }
221
222 pub fn custom_format(
223 &mut self, format: fn(&LogEntry) -> Result<String, fmt::Error>,
224 ) -> &mut Self {
225 self.custom_format = Some(format);
226 self
227 }
228
229 pub fn init(&mut self) { self.build(); }
230
231 pub fn build(&mut self) -> Arc<DiemLogger> {
232 let filter = {
233 let local_filter = {
234 let mut filter_builder = Filter::builder();
235
236 if env::var(RUST_LOG).is_ok() {
237 filter_builder.with_env(RUST_LOG);
238 } else {
239 filter_builder.filter_level(self.level.into());
240 }
241
242 filter_builder.build()
243 };
244 let remote_filter = {
245 let mut filter_builder = Filter::builder();
246
247 if self.is_async && self.address.is_some() {
248 filter_builder.filter_level(self.remote_level.into());
249 } else {
250 filter_builder.filter_level(LevelFilter::Off);
251 }
252
253 filter_builder.build()
254 };
255
256 DiemFilter {
257 local_filter,
258 remote_filter,
259 }
260 };
261
262 let logger = if self.is_async {
263 let (sender, receiver) = mpsc::sync_channel(self.channel_size);
264 let logger = Arc::new(DiemLogger {
265 sender: Some(sender),
266 printer: None,
267 filter: RwLock::new(filter),
268 formatter: self.custom_format.take().unwrap_or(default_format),
269 });
270 let service = LoggerService {
271 receiver,
272 address: self.address.clone(),
273 printer: self.printer.take(),
274 facade: logger.clone(),
275 };
276
277 thread::spawn(move || service.run());
278 logger
279 } else {
280 Arc::new(DiemLogger {
281 sender: None,
282 printer: self.printer.take(),
283 filter: RwLock::new(filter),
284 formatter: self.custom_format.take().unwrap_or(default_format),
285 })
286 };
287
288 crate::logger::set_global_logger(logger.clone());
289 logger
290 }
291}
292
293struct DiemFilter {
295 local_filter: Filter,
297 remote_filter: Filter,
299}
300
301impl DiemFilter {
302 fn enabled(&self, metadata: &Metadata) -> bool {
303 self.local_filter.enabled(metadata)
304 || self.remote_filter.enabled(metadata)
305 }
306}
307
308pub struct DiemLogger {
309 sender: Option<SyncSender<LoggerServiceEvent>>,
310 printer: Option<Box<dyn Writer>>,
311 filter: RwLock<DiemFilter>,
312 pub(crate) formatter: fn(&LogEntry) -> Result<String, fmt::Error>,
313}
314
315impl DiemLogger {
316 pub fn builder() -> DiemLoggerBuilder { DiemLoggerBuilder::new() }
317
318 #[allow(clippy::new_ret_no_self)]
319 pub fn new() -> DiemLoggerBuilder { Self::builder() }
320
321 pub fn init_for_testing() {
322 if env::var(RUST_LOG).is_err() {
323 return;
324 }
325
326 Self::builder()
327 .is_async(false)
328 .printer(Box::new(StderrWriter))
329 .build();
330 }
331
332 pub fn set_filter(&self, filter: Filter) {
333 self.filter.write().local_filter = filter;
334 }
335
336 pub fn set_remote_filter(&self, filter: Filter) {
337 self.filter.write().remote_filter = filter;
338 }
339
340 fn send_entry(&self, entry: LogEntry) {
341 if let Some(printer) = &self.printer {
342 let s = (self.formatter)(&entry).expect("Unable to format");
343 printer.write(s);
344 }
345
346 if let Some(sender) = &self.sender {
347 if let Err(e) = sender.try_send(LoggerServiceEvent::LogEntry(entry))
348 {
349 STRUCT_LOG_QUEUE_ERROR_COUNT.inc();
350 eprintln!("Failed to send structured log: {}", e);
351 }
352 }
353 }
354}
355
356impl Logger for DiemLogger {
357 fn enabled(&self, metadata: &Metadata) -> bool {
358 self.filter.read().enabled(metadata)
359 }
360
361 fn record(&self, event: &Event) {
362 let entry = LogEntry::new(event, ::std::thread::current().name());
363
364 self.send_entry(entry)
365 }
366
367 fn flush(&self) {
368 if let Some(sender) = &self.sender {
369 let (oneshot_sender, oneshot_receiver) = mpsc::sync_channel(1);
370 sender
371 .send(LoggerServiceEvent::Flush(oneshot_sender))
372 .unwrap();
373 oneshot_receiver.recv().unwrap();
374 }
375 }
376}
377
378enum LoggerServiceEvent {
379 LogEntry(LogEntry),
380 Flush(SyncSender<()>),
381}
382
383struct LoggerService {
386 receiver: Receiver<LoggerServiceEvent>,
387 address: Option<String>,
388 printer: Option<Box<dyn Writer>>,
389 facade: Arc<DiemLogger>,
390}
391
392impl LoggerService {
393 pub fn run(mut self) {
394 let mut writer = self.address.take().map(TcpWriter::new);
395
396 for event in self.receiver {
397 match event {
398 LoggerServiceEvent::LogEntry(entry) => {
399 PROCESSED_STRUCT_LOG_COUNT.inc();
400
401 if let Some(printer) = &self.printer {
402 if self
403 .facade
404 .filter
405 .read()
406 .local_filter
407 .enabled(&entry.metadata)
408 {
409 let s = (self.facade.formatter)(&entry)
410 .expect("Unable to format");
411 printer.write(s)
412 }
413 }
414
415 if let Some(writer) = &mut writer {
416 if self
417 .facade
418 .filter
419 .read()
420 .remote_filter
421 .enabled(&entry.metadata)
422 {
423 Self::write_to_logstash(writer, entry);
424 }
425 }
426 }
427 LoggerServiceEvent::Flush(sender) => {
428 let _ = sender.send(());
432 }
433 }
434 }
435 }
436
437 fn write_to_logstash(stream: &mut TcpWriter, mut entry: LogEntry) {
440 if entry.message.is_none() {
443 entry.message = Some(serde_json::to_string(&entry.data).unwrap());
444 }
445
446 let message = if let Ok(json) = serde_json::to_string(&entry) {
447 json
448 } else {
449 STRUCT_LOG_PARSE_ERROR_COUNT.inc();
450 return;
451 };
452
453 let message = message + "\n";
454 let bytes = message.as_bytes();
455 let message_length = bytes.len();
456
457 let mut result = stream.write_all(bytes);
461 for _ in 0..NUM_SEND_RETRIES {
462 if result.is_ok() {
463 break;
464 } else {
465 result = stream.write_all(bytes);
466 }
467 }
468
469 if let Err(e) = result {
470 STRUCT_LOG_SEND_ERROR_COUNT.inc();
471 eprintln!(
472 "[Logging] Error while sending data to logstash({}): {}",
473 stream.endpoint(),
474 e
475 );
476 } else {
477 SENT_STRUCT_LOG_COUNT.inc();
478 SENT_STRUCT_LOG_BYTES.inc_by(message_length as u64);
479 }
480 }
481}
482
483pub trait Writer: Send + Sync {
485 fn write(&self, log: String);
487}
488
489struct StderrWriter;
491
492impl Writer for StderrWriter {
493 fn write(&self, log: String) {
495 eprintln!("{}", log);
496 }
497}
498
499pub struct FileWriter {
501 log_file: RwLock<std::fs::File>,
502}
503
504impl FileWriter {
505 pub fn new(log_file: std::path::PathBuf) -> Self {
506 let file = std::fs::OpenOptions::new()
507 .append(true)
508 .create(true)
509 .open(log_file)
510 .expect("Unable to open log file");
511 Self {
512 log_file: RwLock::new(file),
513 }
514 }
515}
516
517impl Writer for FileWriter {
518 fn write(&self, log: String) {
520 if let Err(err) = writeln!(self.log_file.write(), "{}", log) {
521 eprintln!("Unable to write to log file: {}", err.to_string());
522 }
523 }
524}
525
526pub struct RollingFileWriter {
527 log_file: Mutex<PipeLogger>,
528}
529
530impl RollingFileWriter {
531 pub fn new(
532 log_path: std::path::PathBuf, count: usize, size_mb: usize,
533 ) -> Self {
534 let mut builder = PipeLoggerBuilder::new(log_path);
535 builder
536 .set_compress(true)
537 .set_rotate(Some(RotateMethod::FileSize(
538 size_mb as u64 * 1_000_000,
539 )))
540 .set_count(Some(count));
541 let log_file = builder.build().unwrap();
542 Self {
543 log_file: Mutex::new(log_file),
544 }
545 }
546}
547
548impl Writer for RollingFileWriter {
549 fn write(&self, log: String) {
550 if let Err(e) = self.log_file.lock().write_line(&log) {
551 eprintln!("Unable to write to log file: {}", e.to_string());
552 }
553 }
554}
555
556fn default_format(entry: &LogEntry) -> Result<String, fmt::Error> {
562 use std::fmt::Write;
563
564 let mut w = String::new();
565 write!(w, "{}", entry.timestamp)?;
566
567 if let Some(thread_name) = &entry.thread_name {
568 write!(w, " [{}]", thread_name)?;
569 }
570
571 write!(
572 w,
573 " {} {}",
574 entry.metadata.level(),
575 entry.metadata.location()
576 )?;
577
578 if let Some(message) = &entry.message {
579 write!(w, " {}", message)?;
580 }
581
582 if !entry.data.is_empty() {
583 write!(w, " {}", serde_json::to_string(&entry.data).unwrap())?;
584 }
585
586 Ok(w)
587}
588
589#[cfg(test)]
590mod tests {
591 use super::LogEntry;
592 use crate::{
593 debug, error, info, logger::Logger, trace, warn, Event, Key, KeyValue,
594 Level, Metadata, Schema, Value, Visitor,
595 };
596 use chrono::{DateTime, Utc};
597 use serde_json::Value as JsonValue;
598 use std::{
599 sync::{
600 mpsc::{self, Receiver, SyncSender},
601 Arc,
602 },
603 thread,
604 };
605
606 #[derive(serde::Serialize)]
607 #[serde(rename_all = "snake_case")]
608 enum Enum {
609 FooBar,
610 }
611
612 struct TestSchema<'a> {
613 foo: usize,
614 bar: &'a Enum,
615 }
616
617 impl Schema for TestSchema<'_> {
618 fn visit(&self, visitor: &mut dyn Visitor) {
619 visitor.visit_pair(Key::new("foo"), Value::from_serde(&self.foo));
620 visitor.visit_pair(Key::new("bar"), Value::from_serde(&self.bar));
621 }
622 }
623
624 struct LogStream(SyncSender<LogEntry>);
625
626 impl LogStream {
627 fn new() -> (Self, Receiver<LogEntry>) {
628 let (sender, receiver) = mpsc::sync_channel(1024);
629 (Self(sender), receiver)
630 }
631 }
632
633 impl Logger for LogStream {
634 fn enabled(&self, metadata: &Metadata) -> bool {
635 metadata.level() <= Level::Debug
636 }
637
638 fn record(&self, event: &Event) {
639 let entry = LogEntry::new(event, ::std::thread::current().name());
640 self.0.send(entry).unwrap();
641 }
642
643 fn flush(&self) {}
644 }
645
646 fn set_test_logger() -> Receiver<LogEntry> {
647 let (logger, receiver) = LogStream::new();
648 let logger = Arc::new(logger);
649 crate::logger::set_global_logger(logger);
650 receiver
651 }
652
653 #[test]
656 fn basic() {
657 let receiver = set_test_logger();
658 let number = 12345;
659
660 let before = Utc::now();
662 info!(
663 TestSchema {
664 foo: 5,
665 bar: &Enum::FooBar
666 },
667 test = true,
668 category = "name",
669 KeyValue::new("display", Value::from_display(&number)),
670 "This is a log"
671 );
672 let after = Utc::now();
673
674 let entry = receiver.recv().unwrap();
675
676 assert_eq!(entry.metadata.level(), Level::Info);
678 assert_eq!(
679 entry.metadata.target(),
680 module_path!().split("::").next().unwrap()
681 );
682 assert_eq!(entry.metadata.module_path(), module_path!());
683 assert_eq!(entry.metadata.file(), file!());
684 assert_eq!(entry.message.as_deref(), Some("This is a log"));
685 assert!(entry.backtrace.is_none());
686
687 let timestamp = DateTime::parse_from_rfc3339(&entry.timestamp).unwrap();
689 let timestamp: DateTime<Utc> = DateTime::from(timestamp);
690 assert!(before <= timestamp && timestamp <= after);
691
692 assert_eq!(entry.data.get("foo").and_then(JsonValue::as_u64), Some(5));
694 assert_eq!(
695 entry.data.get("bar").and_then(JsonValue::as_str),
696 Some("foo_bar")
697 );
698 assert_eq!(
699 entry.data.get("display").and_then(JsonValue::as_str),
700 Some(format!("{}", number)).as_deref(),
701 );
702 assert_eq!(
703 entry.data.get("test").and_then(JsonValue::as_bool),
704 Some(true),
705 );
706 assert_eq!(
707 entry.data.get("category").and_then(JsonValue::as_str),
708 Some("name"),
709 );
710
711 error!("This is an error log");
713 let entry = receiver.recv().unwrap();
714 assert!(entry.backtrace.is_some());
715
716 trace!("trace");
720 debug!("debug");
721 info!("info");
722 warn!("warn");
723 error!("error");
724
725 let levels = &[Level::Debug, Level::Info, Level::Warn, Level::Error];
726
727 for level in levels {
728 let entry = receiver.recv().unwrap();
729 assert_eq!(entry.metadata.level(), *level);
730 }
731
732 let handler = thread::Builder::new()
734 .name("named thread".into())
735 .spawn(|| info!("thread"))
736 .unwrap();
737
738 handler.join().unwrap();
739 let entry = receiver.recv().unwrap();
740 assert_eq!(entry.thread_name.as_deref(), Some("named thread"));
741
742 let debug_struct = DebugStruct {};
744 let display_struct = DisplayStruct {};
745
746 error!(identifier = ?debug_struct, "Debug test");
747 error!(identifier = ?debug_struct, other = "value", "Debug2 test");
748 error!(identifier = %display_struct, "Display test");
749 error!(identifier = %display_struct, other = "value", "Display2 test");
750 error!("Literal" = ?debug_struct, "Debug test");
751 error!("Literal" = ?debug_struct, other = "value", "Debug test");
752 error!("Literal" = %display_struct, "Display test");
753 error!("Literal" = %display_struct, other = "value", "Display2 test");
754 error!("Literal" = %display_struct, other = "value", identifier = ?debug_struct, "Mixed test");
755 }
756
757 struct DebugStruct {}
758
759 impl std::fmt::Debug for DebugStruct {
760 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
761 write!(f, "DebugStruct!")
762 }
763 }
764
765 struct DisplayStruct {}
766
767 impl std::fmt::Display for DisplayStruct {
768 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
769 write!(f, "DisplayStruct!")
770 }
771 }
772}