Patch-Source: https://github.com/vectordotdev/vector/commit/badeb4a2e34568ac35ba1cbd755eb0a3085fe4a7 Patch-Source: https://github.com/vectordotdev/vector/commit/fffcea5b3868c5c0e1e499fba0bba6915b6199c3#diff-28dc54a11ac02deff1ab3b7a643983c3e44a9058e2fa603492e16bd23d8a5e76 diff --git a/lib/enrichment/src/vrl_util.rs b/lib/enrichment/src/vrl_util.rs index 0beb948cc435b..9bf924f030341 100644 --- a/lib/enrichment/src/vrl_util.rs +++ b/lib/enrichment/src/vrl_util.rs @@ -38,7 +38,7 @@ impl DiagnosticMessage for Error { } /// Evaluates the condition object to search the enrichment tables with. -pub(crate) fn evaluate_condition(key: &str, value: Value) -> ExpressionResult { +pub(crate) fn evaluate_condition(key: &str, value: Value) -> ExpressionResult> { Ok(match value { Value::Object(map) if map.contains_key("from") && map.contains_key("to") => { Condition::BetweenDates { diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index 2fdd6fb6f4e28..89b6d89e0e1cb 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -122,9 +122,9 @@ impl UncompressedReader for UncompressedReaderImpl { fp.seek(SeekFrom::Start(0))?; let result = fp.read_exact(&mut magic); - if result.is_err() { + if let Err(e) = result { fp.seek(SeekFrom::Start(0))?; - return Err(result.unwrap_err()); + return Err(e); } if magic == magic_header_bytes { diff --git a/lib/prometheus-parser/src/line.rs b/lib/prometheus-parser/src/line.rs index 86b1118f3770c..a3a39a3d8ff1a 100644 --- a/lib/prometheus-parser/src/line.rs +++ b/lib/prometheus-parser/src/line.rs @@ -113,7 +113,7 @@ impl Metric { /// ``` /// /// We don't parse timestamp. - fn parse(input: &str) -> IResult { + fn parse(input: &str) -> IResult<'_, Self> { let input = trim_space(input); let (input, name) = parse_name(input)?; let (input, labels) = Self::parse_labels(input)?; @@ -131,7 +131,7 @@ impl Metric { } /// Float value, and +Inf, -Int, Nan. - pub(crate) fn parse_value(input: &str) -> IResult { + pub(crate) fn parse_value(input: &str) -> IResult<'_, f64> { let input = trim_space(input); alt(( value(f64::INFINITY, tag("+Inf")), @@ -151,7 +151,7 @@ impl Metric { }) } - fn parse_timestamp(input: &str) -> IResult> { + fn parse_timestamp(input: &str) -> IResult<'_, Option> { let input = trim_space(input); opt(map_res( recognize(pair(opt(char('-')), digit1)), @@ -166,7 +166,7 @@ impl Metric { }) } - fn parse_name_value(input: &str) -> IResult<(String, String)> { + fn parse_name_value(input: &str) -> IResult<'_, (String, String)> { map( (parse_name, match_char('='), Self::parse_escaped_string), |(name, _, value)| (name, value), @@ -178,7 +178,7 @@ impl Metric { // - Some((name, value)) => success // - None => list is properly ended with "}" // - Error => errors of parse_name_value - fn element_parser(input: &str) -> IResult> { + fn element_parser(input: &str) -> IResult<'_, Option<(String, String)>> { match Self::parse_name_value(input) { Ok((input, result)) => Ok((input, Some(result))), Err(nom::Err::Error(parse_name_value_error)) => match match_char('}')(input) { @@ -190,7 +190,7 @@ impl Metric { } } - fn parse_labels_inner(mut input: &str) -> IResult> { + fn parse_labels_inner(mut input: &str) -> IResult<'_, BTreeMap> { let sep = match_char(','); let mut result = BTreeMap::new(); @@ -225,7 +225,7 @@ impl Metric { } /// Parse `{label_name="value",...}` - fn parse_labels(input: &str) -> IResult> { + fn parse_labels(input: &str) -> IResult<'_, BTreeMap> { let input = trim_space(input); match opt(char('{')).parse(input) { @@ -238,7 +238,7 @@ impl Metric { /// Parse `'"' string_content '"'`. `string_content` can contain any unicode characters, /// backslash (`\`), double-quote (`"`), and line feed (`\n`) characters have to be /// escaped as `\\`, `\"`, and `\n`, respectively. - fn parse_escaped_string(input: &str) -> IResult { + fn parse_escaped_string(input: &str) -> IResult<'_, String> { #[derive(Debug)] enum StringFragment<'a> { Literal(&'a str), @@ -274,7 +274,7 @@ impl Metric { }, ); - fn match_quote(input: &str) -> IResult { + fn match_quote(input: &str) -> IResult<'_, char> { char('"')(input).map_err(|_: NomError| { ErrorKind::ExpectedChar { expected: '"', @@ -289,7 +289,7 @@ impl Metric { } impl Header { - fn space1(input: &str) -> IResult<()> { + fn space1(input: &str) -> IResult<'_, ()> { take_while1(|c| c == ' ' || c == '\t')(input) .map_err(|_: NomError| { ErrorKind::ExpectedSpace { @@ -301,7 +301,7 @@ impl Header { } /// `# TYPE ` - fn parse(input: &str) -> IResult { + fn parse(input: &str) -> IResult<'_, Self> { let input = trim_space(input); let (input, _) = char('#')(input).map_err(|_: NomError| ErrorKind::ExpectedChar { expected: '#', @@ -372,7 +372,7 @@ impl Line { } /// Name matches the regex `[a-zA-Z_][a-zA-Z0-9_]*`. -fn parse_name(input: &str) -> IResult { +fn parse_name(input: &str) -> IResult<'_, String> { let input = trim_space(input); let (input, (a, b)) = pair( take_while1(|c: char| c.is_alphabetic() || c == '_'), diff --git a/lib/vector-buffers/src/test/messages.rs b/lib/vector-buffers/src/test/messages.rs index ba9040cc498ad..628831fb5e067 100644 --- a/lib/vector-buffers/src/test/messages.rs +++ b/lib/vector-buffers/src/test/messages.rs @@ -265,42 +265,3 @@ impl FixedEncodable for MultiEventRecord { Ok(MultiEventRecord::new(event_count)) } } - -message_wrapper!(PoisonPillMultiEventRecord: u32, |m: &Self| m.0); - -impl PoisonPillMultiEventRecord { - pub fn encoded_size(&self) -> usize { - usize::try_from(self.0).unwrap_or(usize::MAX) + std::mem::size_of::() - } -} - -impl FixedEncodable for PoisonPillMultiEventRecord { - type EncodeError = io::Error; - type DecodeError = io::Error; - - fn encode(self, buffer: &mut B) -> Result<(), Self::EncodeError> - where - B: BufMut, - { - if buffer.remaining_mut() < self.encoded_size() { - return Err(io::Error::other("not enough capacity to encode record")); - } - - buffer.put_u32(self.0); - buffer.put_bytes(0x42, usize::try_from(self.0).unwrap_or(usize::MAX)); - Ok(()) - } - - fn decode(mut buffer: B) -> Result - where - B: Buf, - { - let event_count = buffer.get_u32(); - if event_count == 42 { - return Err(io::Error::other("failed to decode")); - } - - buffer.advance(usize::try_from(event_count).unwrap_or(usize::MAX)); - Ok(PoisonPillMultiEventRecord::new(event_count)) - } -} diff --git a/lib/vector-config-macros/src/ast/container.rs b/lib/vector-config-macros/src/ast/container.rs index c3e9913f5612c..be38e739e9877 100644 --- a/lib/vector-config-macros/src/ast/container.rs +++ b/lib/vector-config-macros/src/ast/container.rs @@ -280,7 +280,7 @@ impl<'a> Container<'a> { /// Data for the container. /// /// This would be the fields of a struct, or the variants for an enum. - pub fn data(&self) -> &Data { + pub fn data(&self) -> &Data<'_> { &self.data } diff --git a/lib/vector-config/src/schema/parser/component.rs b/lib/vector-config/src/schema/parser/component.rs index da86f25ced59d..2213d11d501ad 100644 --- a/lib/vector-config/src/schema/parser/component.rs +++ b/lib/vector-config/src/schema/parser/component.rs @@ -61,7 +61,7 @@ impl ComponentSchema<'_> { } impl QueryableSchema for ComponentSchema<'_> { - fn schema_type(&self) -> SchemaType { + fn schema_type(&self) -> SchemaType<'_> { self.schema.schema_type() } diff --git a/lib/vector-config/src/schema/parser/query.rs b/lib/vector-config/src/schema/parser/query.rs index c4ad92d1f4cc2..45b1fa8cabcfa 100644 --- a/lib/vector-config/src/schema/parser/query.rs +++ b/lib/vector-config/src/schema/parser/query.rs @@ -224,7 +224,7 @@ pub enum SchemaType<'a> { } pub trait QueryableSchema { - fn schema_type(&self) -> SchemaType; + fn schema_type(&self) -> SchemaType<'_>; fn description(&self) -> Option<&str>; fn title(&self) -> Option<&str>; fn get_attributes(&self, key: &str) -> Option>; @@ -236,7 +236,7 @@ impl QueryableSchema for &T where T: QueryableSchema, { - fn schema_type(&self) -> SchemaType { + fn schema_type(&self) -> SchemaType<'_> { (*self).schema_type() } @@ -262,7 +262,7 @@ where } impl QueryableSchema for &SchemaObject { - fn schema_type(&self) -> SchemaType { + fn schema_type(&self) -> SchemaType<'_> { // TODO: Technically speaking, it is allowed to use the "X of" schema types in conjunction // with other schema types i.e. `allOf` in conjunction with specifying a `type`. // @@ -386,7 +386,7 @@ impl<'a> From<&'a SchemaObject> for SimpleSchema<'a> { } impl QueryableSchema for SimpleSchema<'_> { - fn schema_type(&self) -> SchemaType { + fn schema_type(&self) -> SchemaType<'_> { self.schema.schema_type() } diff --git a/lib/vector-core/src/event/array.rs b/lib/vector-core/src/event/array.rs index f6e2768167bca..c61677899ae3c 100644 --- a/lib/vector-core/src/event/array.rs +++ b/lib/vector-core/src/event/array.rs @@ -173,7 +173,7 @@ impl EventArray { } /// Iterate over references to this array's events. - pub fn iter_events(&self) -> impl Iterator { + pub fn iter_events(&self) -> impl Iterator> { match self { Self::Logs(array) => EventArrayIter::Logs(array.iter()), Self::Metrics(array) => EventArrayIter::Metrics(array.iter()), @@ -182,7 +182,7 @@ impl EventArray { } /// Iterate over mutable references to this array's events. - pub fn iter_events_mut(&mut self) -> impl Iterator { + pub fn iter_events_mut(&mut self) -> impl Iterator> { match self { Self::Logs(array) => EventArrayIterMut::Logs(array.iter_mut()), Self::Metrics(array) => EventArrayIterMut::Metrics(array.iter_mut()), diff --git a/lib/vector-core/src/event/log_event.rs b/lib/vector-core/src/event/log_event.rs index 5e3355714b36b..91ff892283a8b 100644 --- a/lib/vector-core/src/event/log_event.rs +++ b/lib/vector-core/src/event/log_event.rs @@ -1033,9 +1033,8 @@ mod test { fn json_value_to_vector_log_event_to_json_value() { const FIXTURE_ROOT: &str = "tests/data/fixtures/log_event"; - std::fs::read_dir(FIXTURE_ROOT) - .unwrap() - .for_each(|fixture_file| match fixture_file { + for fixture_file in std::fs::read_dir(FIXTURE_ROOT).unwrap() { + match fixture_file { Ok(fixture_file) => { let path = fixture_file.path(); tracing::trace!(?path, "Opening."); @@ -1047,7 +1046,8 @@ mod test { assert_eq!(serde_value, serde_value_again); } _ => panic!("This test should never read Err'ing test fixtures."), - }); + } + } } fn assert_merge_value( diff --git a/lib/vector-core/src/event/util/log/all_fields.rs b/lib/vector-core/src/event/util/log/all_fields.rs index 5194f05cb707d..2e1511c124e42 100644 --- a/lib/vector-core/src/event/util/log/all_fields.rs +++ b/lib/vector-core/src/event/util/log/all_fields.rs @@ -11,30 +11,30 @@ static IS_VALID_PATH_SEGMENT: LazyLock = /// Iterates over all paths in form `a.b[0].c[1]` in alphabetical order /// and their corresponding values. -pub fn all_fields(fields: &ObjectMap) -> FieldsIter { +pub fn all_fields(fields: &ObjectMap) -> FieldsIter<'_> { FieldsIter::new(None, fields, true) } /// Iterates over all paths in form `a.b[0].c[1]` in alphabetical order and their corresponding /// values. Field names containing meta-characters are not quoted. -pub fn all_fields_unquoted(fields: &ObjectMap) -> FieldsIter { +pub fn all_fields_unquoted(fields: &ObjectMap) -> FieldsIter<'_> { FieldsIter::new(None, fields, false) } /// Same functionality as `all_fields` but it prepends a character that denotes the /// path type. -pub fn all_metadata_fields(fields: &ObjectMap) -> FieldsIter { +pub fn all_metadata_fields(fields: &ObjectMap) -> FieldsIter<'_> { FieldsIter::new(Some(PathPrefix::Metadata), fields, true) } /// An iterator with a single "message" element -pub fn all_fields_non_object_root(value: &Value) -> FieldsIter { +pub fn all_fields_non_object_root(value: &Value) -> FieldsIter<'_> { FieldsIter::non_object(value) } /// An iterator similar to `all_fields`, but instead of visiting each array element individually, /// it treats the entire array as a single value. -pub fn all_fields_skip_array_elements(fields: &ObjectMap) -> FieldsIter { +pub fn all_fields_skip_array_elements(fields: &ObjectMap) -> FieldsIter<'_> { FieldsIter::new_with_skip_array_elements(fields) } diff --git a/lib/vector-core/src/fanout.rs b/lib/vector-core/src/fanout.rs index c6368598f3249..746f47ae7179b 100644 --- a/lib/vector-core/src/fanout.rs +++ b/lib/vector-core/src/fanout.rs @@ -622,7 +622,10 @@ mod tests { fanout.send(clones, None).await.expect("should not fail"); for receiver in receivers { - assert_eq!(collect_ready(receiver.into_stream()), &[events.clone()]); + assert_eq!( + collect_ready(receiver.into_stream()), + std::slice::from_ref(&events) + ); } } diff --git a/lib/vector-core/src/transform/mod.rs b/lib/vector-core/src/transform/mod.rs index ed4e5f1d91e58..f8f31e7eb1537 100644 --- a/lib/vector-core/src/transform/mod.rs +++ b/lib/vector-core/src/transform/mod.rs @@ -470,7 +470,7 @@ impl OutputBuffer { self.0.capacity() } - pub fn first(&self) -> Option { + pub fn first(&self) -> Option> { self.0.first().and_then(|first| match first { EventArray::Logs(l) => l.first().map(Into::into), EventArray::Metrics(m) => m.first().map(Into::into), @@ -494,11 +494,11 @@ impl OutputBuffer { Ok(()) } - fn iter_events(&self) -> impl Iterator { + fn iter_events(&self) -> impl Iterator> { self.0.iter().flat_map(EventArray::iter_events) } - fn events_mut(&mut self) -> impl Iterator { + fn events_mut(&mut self) -> impl Iterator> { self.0.iter_mut().flat_map(EventArray::iter_events_mut) } diff --git a/rust-toolchain.toml b/rust-toolchain.toml index c2ee2a8766ad1..b6981e6398a20 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.88" +channel = "1.89" profile = "default" diff --git a/src/api/schema/components/sink.rs b/src/api/schema/components/sink.rs index e92c2133f7fc7..99bcca38cc634 100644 --- a/src/api/schema/components/sink.rs +++ b/src/api/schema/components/sink.rs @@ -28,7 +28,7 @@ impl Sink { &self.0.component_key } - pub fn get_component_type(&self) -> &str { + pub const fn get_component_type(&self) -> &str { self.0.component_type.as_str() } } diff --git a/src/api/schema/components/source.rs b/src/api/schema/components/source.rs index b10846803601f..c2289c4b6a15b 100644 --- a/src/api/schema/components/source.rs +++ b/src/api/schema/components/source.rs @@ -43,7 +43,7 @@ impl Source { pub fn get_component_key(&self) -> &ComponentKey { &self.0.component_key } - pub fn get_component_type(&self) -> &str { + pub const fn get_component_type(&self) -> &str { self.0.component_type.as_str() } pub fn get_output_types(&self) -> Vec { diff --git a/src/api/schema/components/transform.rs b/src/api/schema/components/transform.rs index 6c8b50eb205e8..7fe95238b7dda 100644 --- a/src/api/schema/components/transform.rs +++ b/src/api/schema/components/transform.rs @@ -28,7 +28,7 @@ impl Transform { pub const fn get_component_key(&self) -> &ComponentKey { &self.0.component_key } - pub fn get_component_type(&self) -> &str { + pub const fn get_component_type(&self) -> &str { self.0.component_type.as_str() } pub fn get_outputs(&self) -> &[String] { diff --git a/src/api/schema/metrics/host.rs b/src/api/schema/metrics/host.rs index 6a814b1c5f311..a75243253c85b 100644 --- a/src/api/schema/metrics/host.rs +++ b/src/api/schema/metrics/host.rs @@ -1,9 +1,9 @@ -use async_graphql::Object; - use crate::{ event::{Metric, MetricValue}, sources::host_metrics, }; +use async_graphql::Object; +use cfg_if::cfg_if; pub struct MemoryMetrics(Vec); @@ -259,23 +259,27 @@ impl DiskMetrics { } } -pub struct TCPMetrics(Vec); - -#[Object] -impl TCPMetrics { - /// Total TCP connections - async fn tcp_conns_total(&self) -> f64 { - filter_host_metric(&self.0, "tcp_connections_total") - } - - /// Total bytes in the send queue across all connections. - async fn tcp_tx_queued_bytes_total(&self) -> f64 { - filter_host_metric(&self.0, "tcp_tx_queued_bytes_total") - } - - /// Total bytes in the receive queue across all connections. - async fn tcp_rx_queued_bytes_total(&self) -> f64 { - filter_host_metric(&self.0, "tcp_rx_queued_bytes_total") +cfg_if! { + if #[cfg(target_os = "linux")] { + pub struct TCPMetrics(Vec); + + #[Object] + impl TCPMetrics { + /// Total TCP connections + async fn tcp_conns_total(&self) -> f64 { + filter_host_metric(&self.0, "tcp_connections_total") + } + + /// Total bytes in the send queue across all connections. + async fn tcp_tx_queued_bytes_total(&self) -> f64 { + filter_host_metric(&self.0, "tcp_tx_queued_bytes_total") + } + + /// Total bytes in the receive queue across all connections. + async fn tcp_rx_queued_bytes_total(&self) -> f64 { + filter_host_metric(&self.0, "tcp_rx_queued_bytes_total") + } + } } } diff --git a/src/api/schema/metrics/source/file.rs b/src/api/schema/metrics/source/file.rs index 50b89eb69c831..43c2a2751ceda 100644 --- a/src/api/schema/metrics/source/file.rs +++ b/src/api/schema/metrics/source/file.rs @@ -25,7 +25,7 @@ impl<'a> FileSourceMetricFile<'a> { Self { name, metrics } } - pub fn get_name(&self) -> &str { + pub const fn get_name(&self) -> &str { self.name.as_str() } } @@ -215,7 +215,7 @@ mod tests { } } - fn get_metric(&self) -> FileSourceMetricFile { + fn get_metric(&self) -> FileSourceMetricFile<'_> { FileSourceMetricFile::from_tuple(( self.name.to_string(), vec![&self.bytes_metric, &self.events_metric], diff --git a/src/common/http/error.rs b/src/common/http/error.rs index 2edef0b948d99..8e03fc882b9fc 100644 --- a/src/common/http/error.rs +++ b/src/common/http/error.rs @@ -40,7 +40,7 @@ impl ErrorMessage { } /// Returns the error message - pub fn message(&self) -> &str { + pub const fn message(&self) -> &str { self.message.as_str() } } diff --git a/src/components/validation/resources/event.rs b/src/components/validation/resources/event.rs index 065b781abd480..ddd4d72eda5ad 100644 --- a/src/components/validation/resources/event.rs +++ b/src/components/validation/resources/event.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use bytes::BytesMut; use serde::Deserialize; use serde_json::Value; -use snafu::Snafu; use tokio_util::codec::Encoder as _; use vector_lib::codecs::encoding::format::JsonSerializerOptions; @@ -136,9 +135,6 @@ impl TestEvent { } } -#[derive(Clone, Debug, Eq, PartialEq, Snafu)] -pub enum RawTestEventParseError {} - impl From for TestEvent { fn from(other: RawTestEvent) -> Self { match other { diff --git a/src/enrichment_tables/memory/table.rs b/src/enrichment_tables/memory/table.rs index b7d9bafb8c696..0d97c7ebbcc21 100644 --- a/src/enrichment_tables/memory/table.rs +++ b/src/enrichment_tables/memory/table.rs @@ -375,6 +374,7 @@ impl StreamSink for Memory { mod tests { use futures::{StreamExt, future::ready}; use futures_util::stream; + use std::slice::from_ref; use std::{num::NonZeroU64, time::Duration}; use tokio::time; @@ -483,7 +483,7 @@ mod tests { ("ttl".into(), Value::from(0)), ("value".into(), Value::from(5)), ])), - memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None, None) + memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None) ); // Force scan @@ -549,7 +549,7 @@ mod tests { ("ttl".into(), Value::from(ttl / 2)), ("value".into(), Value::from(5)), ])), - memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None, None) + memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None) ); memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))])); diff --git a/src/generate.rs b/src/generate.rs index 282cfce6457c3..d634c5fbe8482 100644 --- a/src/generate.rs +++ b/src/generate.rs @@ -324,9 +324,9 @@ pub(crate) fn generate_example( }; let file = opts.file.as_ref(); - if file.is_some() { - #[allow(clippy::print_stdout)] - match write_config(file.as_ref().unwrap(), &builder) { + if let Some(path) = file { + match write_config(path, &builder) { + #[allow(clippy::print_stdout)] Ok(_) => { println!( "Config file written to {:?}", diff --git a/src/internal_events/file.rs b/src/internal_events/file.rs index 8ed5b488d2640..252cd7d795375 100644 --- a/src/internal_events/file.rs +++ b/src/internal_events/file.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] // TODO requires optional feature compilation + use metrics::{counter, gauge}; use std::borrow::Cow; use vector_lib::{ diff --git a/src/internal_events/http_client_source.rs b/src/internal_events/http_client_source.rs index 376adf35eda56..7bbcd1d72a6cb 100644 --- a/src/internal_events/http_client_source.rs +++ b/src/internal_events/http_client_source.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] // TODO requires optional feature compilation + use metrics::counter; use vector_lib::internal_event::InternalEvent; use vector_lib::{ diff --git a/src/internal_events/kafka.rs b/src/internal_events/kafka.rs index ea1b198be7053..d62bec3858a23 100644 --- a/src/internal_events/kafka.rs +++ b/src/internal_events/kafka.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] // TODO requires optional feature compilation + use metrics::{counter, gauge}; use vector_lib::internal_event::InternalEvent; use vector_lib::{ diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 070c6951b2e50..b30eddda95ccb 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -2,6 +2,7 @@ pub mod prelude; mod adaptive_concurrency; +#[cfg(feature = "transforms-aggregate")] mod aggregate; #[cfg(any(feature = "sources-amqp", feature = "sinks-amqp"))] mod amqp; @@ -78,6 +79,7 @@ mod kafka; mod kubernetes_logs; #[cfg(feature = "transforms-log_to_metric")] mod log_to_metric; +#[cfg(feature = "sources-heroku_logs")] mod logplex; #[cfg(feature = "sinks-loki")] mod loki; @@ -92,6 +94,11 @@ mod mqtt; #[cfg(feature = "sources-nginx_metrics")] mod nginx_metrics; mod open; +#[cfg(any( + feature = "sources-kubernetes_logs", + feature = "transforms-log_to_metric", + feature = "sinks-datadog_events", +))] mod parser; #[cfg(feature = "sources-postgresql_metrics")] mod postgresql_metrics; @@ -108,7 +115,9 @@ mod pulsar; mod redis; #[cfg(feature = "transforms-impl-reduce")] mod reduce; +#[cfg(feature = "transforms-remap")] mod remap; +#[cfg(feature = "transforms-impl-sample")] mod sample; #[cfg(feature = "sinks-sematext")] mod sematext_metrics; @@ -124,6 +133,7 @@ mod template; #[cfg(feature = "transforms-throttle")] mod throttle; mod udp; +#[cfg(unix)] mod unix; #[cfg(any(feature = "sources-websocket", feature = "sinks-websocket"))] mod websocket; @@ -138,6 +148,8 @@ mod window; feature = "sinks-file", ))] mod file; + +#[cfg(windows)] mod windows; #[cfg(any(feature = "transforms-log_to_metric", feature = "sinks-loki"))] @@ -236,7 +248,11 @@ pub(crate) use self::metric_to_log::*; pub(crate) use self::mqtt::*; #[cfg(feature = "sources-nginx_metrics")] pub(crate) use self::nginx_metrics::*; -#[allow(unused_imports)] +#[cfg(any( + feature = "sources-kubernetes_logs", + feature = "transforms-log_to_metric", + feature = "sinks-datadog_events", +))] pub(crate) use self::parser::*; #[cfg(feature = "sources-postgresql_metrics")] pub(crate) use self::postgresql_metrics::*; @@ -276,6 +292,7 @@ pub(crate) use self::websocket_server::*; pub(crate) use self::window::*; #[cfg(windows)] pub(crate) use self::windows::*; + pub use self::{ adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*, heartbeat::*, http::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*, diff --git a/src/internal_events/parser.rs b/src/internal_events/parser.rs index 686b55c1356a6..46f296a114e85 100644 --- a/src/internal_events/parser.rs +++ b/src/internal_events/parser.rs @@ -1,10 +1,12 @@ +#![allow(dead_code)] // TODO requires optional feature compilation + use std::borrow::Cow; use metrics::counter; use vector_lib::internal_event::InternalEvent; use vector_lib::internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL}; -fn truncate_string_at(s: &str, maxlen: usize) -> Cow { +fn truncate_string_at(s: &str, maxlen: usize) -> Cow<'_, str> { let ellipsis: &str = "[...]"; if s.len() >= maxlen { let mut len = maxlen - ellipsis.len(); diff --git a/src/internal_events/prelude.rs b/src/internal_events/prelude.rs index e01bab6933b9f..820ae81440003 100644 --- a/src/internal_events/prelude.rs +++ b/src/internal_events/prelude.rs @@ -8,6 +8,11 @@ pub(crate) fn http_error_code(code: u16) -> String { format!("http_response_{code}") } +#[cfg(any( + feature = "sources-aws_kinesis_firehose", + feature = "sources-exec", + feature = "sources-heroku_logs", +))] pub(crate) fn io_error_code(error: &std::io::Error) -> &'static str { use std::io::ErrorKind::*; diff --git a/src/internal_events/prometheus.rs b/src/internal_events/prometheus.rs index 6aebb7deb9e79..984fd9980b472 100644 --- a/src/internal_events/prometheus.rs +++ b/src/internal_events/prometheus.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] // TODO requires optional feature compilation + #[cfg(feature = "sources-prometheus-scrape")] use std::borrow::Cow; diff --git a/src/internal_events/pulsar.rs b/src/internal_events/pulsar.rs index 62787ed5e58c6..838a2aadd995e 100644 --- a/src/internal_events/pulsar.rs +++ b/src/internal_events/pulsar.rs @@ -1,6 +1,8 @@ -use metrics::counter; +#![allow(dead_code)] // TODO requires optional feature compilation + #[cfg(feature = "sources-pulsar")] use metrics::Counter; +use metrics::counter; use vector_lib::internal_event::{ error_stage, error_type, ComponentEventsDropped, InternalEvent, UNINTENTIONAL, }; diff --git a/src/internal_events/unix.rs b/src/internal_events/unix.rs index 90caae1e32274..ea91cd483bdd9 100644 --- a/src/internal_events/unix.rs +++ b/src/internal_events/unix.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] // TODO requires optional feature compilation + use std::{io::Error, path::Path}; use metrics::counter; diff --git a/src/sinks/util/adaptive_concurrency/service.rs b/src/sinks/util/adaptive_concurrency/service.rs index bd0ed2ed4ff80..17bd06fdbcdf7 100644 --- a/src/sinks/util/adaptive_concurrency/service.rs +++ b/src/sinks/util/adaptive_concurrency/service.rs @@ -233,7 +233,7 @@ mod tests { } } - fn inner(&self) -> MutexGuard { + fn inner(&self) -> MutexGuard<'_, Inner> { self.inner.lock().unwrap() } } diff --git a/src/sinks/util/snappy.rs b/src/sinks/util/snappy.rs index 8ab59cdda23dc..a369ddb48bc58 100644 --- a/src/sinks/util/snappy.rs +++ b/src/sinks/util/snappy.rs @@ -39,7 +39,7 @@ impl SnappyEncoder { &self.writer } - pub fn is_empty(&self) -> bool { + pub const fn is_empty(&self) -> bool { self.buffer.is_empty() } } diff --git a/src/sources/demo_logs.rs b/src/sources/demo_logs.rs index 6e0de5b7b66e8..210802808028e 100644 --- a/src/sources/demo_logs.rs +++ b/src/sources/demo_logs.rs @@ -175,7 +175,7 @@ impl OutputFormat { } // Ensures that the `lines` list is non-empty if `Shuffle` is chosen - pub(self) fn validate(&self) -> Result<(), DemoLogsConfigError> { + pub(self) const fn validate(&self) -> Result<(), DemoLogsConfigError> { match self { Self::Shuffle { lines, .. } => { if lines.is_empty() { diff --git a/src/sources/exec/mod.rs b/src/sources/exec/mod.rs index dd040658e2dba..4512f44394a48 100644 --- a/src/sources/exec/mod.rs +++ b/src/sources/exec/mod.rs @@ -207,7 +207,7 @@ const COMMAND_KEY: &str = "command"; impl_generate_config_from_default!(ExecConfig); impl ExecConfig { - fn validate(&self) -> Result<(), ExecConfigError> { + const fn validate(&self) -> Result<(), ExecConfigError> { if self.command.is_empty() { Err(ExecConfigError::CommandEmpty) } else if self.maximum_buffer_size_bytes == 0 { diff --git a/src/sources/kubernetes_logs/parser/cri.rs b/src/sources/kubernetes_logs/parser/cri.rs index 4d2f5da464401..81088251b0b79 100644 --- a/src/sources/kubernetes_logs/parser/cri.rs +++ b/src/sources/kubernetes_logs/parser/cri.rs @@ -143,7 +143,7 @@ const fn is_delimiter(c: &u8) -> bool { /// /// Equivalent to regex: `(?-u)^(?P.*) (?P(stdout|stderr)) (?P(P|F)) (?P.*)(?P\n?)$` #[inline] -fn parse_log_line(line: &[u8]) -> Option { +fn parse_log_line(line: &[u8]) -> Option> { let rest = line; let after_timestamp_pos = rest.iter().position(is_delimiter)?; diff --git a/src/sources/kubernetes_logs/transform_utils/mod.rs b/src/sources/kubernetes_logs/transform_utils/mod.rs index fe61b49f72a09..2c32c0d24fcbe 100644 --- a/src/sources/kubernetes_logs/transform_utils/mod.rs +++ b/src/sources/kubernetes_logs/transform_utils/mod.rs @@ -2,8 +2,6 @@ use vector_lib::config::{LogNamespace, log_schema}; use vrl::owned_value_path; use vrl::path::OwnedTargetPath; -pub mod optional; - pub(crate) fn get_message_path(log_namespace: LogNamespace) -> OwnedTargetPath { match log_namespace { LogNamespace::Vector => OwnedTargetPath::event(owned_value_path!()), diff --git a/src/sources/kubernetes_logs/transform_utils/optional.rs b/src/sources/kubernetes_logs/transform_utils/optional.rs deleted file mode 100644 index 56278bef8ace0..0000000000000 --- a/src/sources/kubernetes_logs/transform_utils/optional.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Optional transform. - -#![deny(missing_docs)] - -use std::pin::Pin; - -use futures::Stream; - -use crate::event::EventContainer; -use crate::transforms::TaskTransform; - -/// Optional transform. -/// Passes events through the specified transform is any, otherwise passes them, -/// as-is. -/// Useful to avoid boxing the transforms. -#[derive(Clone, Debug)] -pub struct Optional(pub Option); - -impl, E: EventContainer + 'static> TaskTransform for Optional { - fn transform( - self: Box, - task: Pin + Send>>, - ) -> Pin + Send>> - where - Self: 'static, - { - match self.0 { - Some(val) => Box::new(val).transform(task), - None => task, - } - } -} diff --git a/src/template.rs b/src/template.rs index 512b3df681d5a..e421c2ff3b140 100644 --- a/src/template.rs +++ b/src/template.rs @@ -236,7 +236,7 @@ impl Template { } /// Returns `true` if this template string has a length of zero, and `false` otherwise. - pub fn is_empty(&self) -> bool { + pub const fn is_empty(&self) -> bool { self.src.is_empty() } diff --git a/src/top/state.rs b/src/top/state.rs index 597ef6d7e8dcc..7790b56094eb0 100644 --- a/src/top/state.rs +++ b/src/top/state.rs @@ -59,7 +59,7 @@ pub enum ConnectionStatus { } impl ConnectionStatus { - pub fn as_ui_spans(&self) -> Vec { + pub fn as_ui_spans(&self) -> Vec> { match self { Self::Pending => vec![Span::styled( "Connecting...", diff --git a/src/topology/controller.rs b/src/topology/controller.rs index 6c89be1ca4ec9..1726130d1040a 100644 --- a/src/topology/controller.rs +++ b/src/topology/controller.rs @@ -17,11 +17,11 @@ impl SharedTopologyController { Self(Arc::new(Mutex::new(inner))) } - pub fn blocking_lock(&self) -> MutexGuard { + pub fn blocking_lock(&self) -> MutexGuard<'_, TopologyController> { self.0.blocking_lock() } - pub async fn lock(&self) -> MutexGuard { + pub async fn lock(&self) -> MutexGuard<'_, TopologyController> { self.0.lock().await } diff --git a/src/vector_windows.rs b/src/vector_windows.rs index fead3981e2fc1..dc29efec8170e 100644 --- a/src/vector_windows.rs +++ b/src/vector_windows.rs @@ -54,7 +54,7 @@ pub mod service_control { } } - const fn error_display(error: &windows_service::Error) -> ErrorDisplay { + const fn error_display(error: &windows_service::Error) -> ErrorDisplay<'_> { ErrorDisplay { error } }