1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
//! Implementations for sending logs to external log processes e.g. Logstash
//!
//! Handles sending logs under disconnects, and retries. Tries to continue to
//! make progress on a log but eventually drops older logs to continue to make
//! progress on newer logs.
use crate::counters::{
STRUCT_LOG_CONNECT_ERROR_COUNT, STRUCT_LOG_TCP_CONNECT_COUNT,
};
use std::{
io::{self, Write},
net::{TcpStream, ToSocketAddrs},
time::Duration,
};
const WRITE_TIMEOUT_MS: u64 = 2000;
const CONNECTION_TIMEOUT_MS: u64 = 5000;
/// A wrapper for `TcpStream` that handles reconnecting to the endpoint
/// automatically
///
/// `TcpWriter::write()` will block on the message until it is connected.
pub(crate) struct TcpWriter {
/// The DNS name or IP address logs are being sent to
endpoint: String,
/// The `TCPStream` to write to, which will be `None` when disconnected
stream: Option<TcpStream>,
}
impl TcpWriter {
pub fn new(endpoint: String) -> Self {
Self {
endpoint,
stream: None,
}
}
pub fn endpoint(&self) -> &str { &self.endpoint }
/// Ensure that we get a connection, no matter how long it takes
/// This will block until there is a connection
fn refresh_connection(&mut self) {
loop {
match self.connect() {
Ok(stream) => {
self.stream = Some(stream);
return;
}
Err(e) => {
eprintln!("[Logging] Failed to connect: {}", e);
STRUCT_LOG_CONNECT_ERROR_COUNT.inc();
}
}
// Sleep a second so this doesn't just spin as fast as possible
std::thread::sleep(Duration::from_millis(1000));
}
}
/// Connect and ensure the write timeout is set
fn connect(&mut self) -> io::Result<TcpStream> {
STRUCT_LOG_TCP_CONNECT_COUNT.inc();
let mut last_error = io::Error::new(
io::ErrorKind::Other,
format!("Unable to resolve and connect to {}", self.endpoint),
);
// resolve addresses to handle DNS names
for addr in self.endpoint.to_socket_addrs()? {
match TcpStream::connect_timeout(
&addr,
Duration::from_millis(CONNECTION_TIMEOUT_MS),
) {
Ok(stream) => {
// Set the write timeout
if let Err(err) = stream.set_write_timeout(Some(
Duration::from_millis(WRITE_TIMEOUT_MS),
)) {
STRUCT_LOG_CONNECT_ERROR_COUNT.inc();
eprintln!(
"[Logging] Failed to set write timeout: {}",
err
);
continue;
}
return Ok(stream);
}
Err(err) => last_error = err,
}
}
Err(last_error)
}
}
impl Write for TcpWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
// Refresh the connection if it's missing
if self.stream.is_none() {
self.refresh_connection();
}
// Attempt to write, and if it fails clear underlying stream
// This doesn't guarantee a message cut off mid send will work, but it
// does guarantee that we will connect first
self.stream
.as_mut()
.ok_or_else(|| {
io::Error::new(io::ErrorKind::NotConnected, "No stream")
})
.and_then(|stream| stream.write(buf))
.map_err(|e| {
self.stream = None;
e
})
}
fn flush(&mut self) -> io::Result<()> {
if let Some(mut stream) = self.stream.as_ref() {
stream.flush()
} else {
Err(io::Error::new(
io::ErrorKind::NotConnected,
"Can't flush, not connected",
))
}
}
}