From ce30ae49f314cc5dce4b04678f5f0c9cf17eb688 Mon Sep 17 00:00:00 2001 From: Matt Date: Sun, 22 Feb 2026 14:56:03 +0000 Subject: [PATCH] [FEATURE] AWS SD: Add Elasticache Role (#18099) * AWS SD: Elasticache This change adds Elasticache to the AWS SD. Co-authored-by: Ben Kochie Signed-off-by: Matt --------- Signed-off-by: Matt Co-authored-by: Ben Kochie --- cmd/prometheus/testdata/features.json | 1 + discovery/aws/aws.go | 54 +- discovery/aws/elasticache.go | 907 ++++++++++++++++++++++++++ discovery/aws/elasticache_test.go | 615 +++++++++++++++++ discovery/aws/metrics_elasticache.go | 32 + docs/configuration/configuration.md | 113 +++- go.mod | 1 + go.sum | 2 + 8 files changed, 1713 insertions(+), 12 deletions(-) create mode 100644 discovery/aws/elasticache.go create mode 100644 discovery/aws/elasticache_test.go create mode 100644 discovery/aws/metrics_elasticache.go diff --git a/cmd/prometheus/testdata/features.json b/cmd/prometheus/testdata/features.json index 5fc01aa195..ce7dbbaebe 100644 --- a/cmd/prometheus/testdata/features.json +++ b/cmd/prometheus/testdata/features.json @@ -186,6 +186,7 @@ "dockerswarm": true, "ec2": true, "ecs": true, + "elasticache": true, "eureka": true, "file": true, "gce": true, diff --git a/discovery/aws/aws.go b/discovery/aws/aws.go index 69b3b41c06..f0f9c3d4df 100644 --- a/discovery/aws/aws.go +++ b/discovery/aws/aws.go @@ -43,10 +43,11 @@ type Role string // The valid options for Role. const ( - RoleEC2 Role = "ec2" - RoleECS Role = "ecs" - RoleLightsail Role = "lightsail" - RoleMSK Role = "msk" + RoleEC2 Role = "ec2" + RoleECS Role = "ecs" + RoleElasticache Role = "elasticache" + RoleLightsail Role = "lightsail" + RoleMSK Role = "msk" ) // UnmarshalYAML implements the yaml.Unmarshaler interface. @@ -55,7 +56,7 @@ func (c *Role) UnmarshalYAML(unmarshal func(any) error) error { return err } switch *c { - case RoleEC2, RoleECS, RoleLightsail, RoleMSK: + case RoleEC2, RoleECS, RoleElasticache, RoleLightsail, RoleMSK: return nil default: return fmt.Errorf("unknown AWS SD role %q", *c) @@ -86,10 +87,11 @@ type SDConfig struct { Clusters []string `yaml:"clusters,omitempty"` // Embedded sub-configs (internal use only, not serialized) - *EC2SDConfig `yaml:"-"` - *ECSSDConfig `yaml:"-"` - *LightsailSDConfig `yaml:"-"` - *MSKSDConfig `yaml:"-"` + *EC2SDConfig `yaml:"-"` + *ECSSDConfig `yaml:"-"` + *ElasticacheSDConfig `yaml:"-"` + *LightsailSDConfig `yaml:"-"` + *MSKSDConfig `yaml:"-"` } // UnmarshalYAML implements the yaml.Unmarshaler interface for SDConfig. @@ -172,6 +174,37 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(any) error) error { if c.Clusters != nil { c.ECSSDConfig.Clusters = c.Clusters } + case RoleElasticache: + if c.ElasticacheSDConfig == nil { + elasticacheConfig := DefaultElasticacheSDConfig + c.ElasticacheSDConfig = &elasticacheConfig + } + c.ElasticacheSDConfig.HTTPClientConfig = c.HTTPClientConfig + c.ElasticacheSDConfig.Region = c.Region + if c.Endpoint != "" { + c.ElasticacheSDConfig.Endpoint = c.Endpoint + } + if c.AccessKey != "" { + c.ElasticacheSDConfig.AccessKey = c.AccessKey + } + if c.SecretKey != "" { + c.ElasticacheSDConfig.SecretKey = c.SecretKey + } + if c.Profile != "" { + c.ElasticacheSDConfig.Profile = c.Profile + } + if c.RoleARN != "" { + c.ElasticacheSDConfig.RoleARN = c.RoleARN + } + if c.Port != 0 { + c.ElasticacheSDConfig.Port = c.Port + } + if c.RefreshInterval != 0 { + c.ElasticacheSDConfig.RefreshInterval = c.RefreshInterval + } + if c.Clusters != nil { + c.ElasticacheSDConfig.Clusters = c.Clusters + } case RoleLightsail: if c.LightsailSDConfig == nil { lightsailConfig := DefaultLightsailSDConfig @@ -259,6 +292,9 @@ func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Di case RoleECS: opts.Metrics = &ecsMetrics{refreshMetrics: awsMetrics.refreshMetrics} return NewECSDiscovery(c.ECSSDConfig, opts) + case RoleElasticache: + opts.Metrics = &elasticacheMetrics{refreshMetrics: awsMetrics.refreshMetrics} + return NewElasticacheDiscovery(c.ElasticacheSDConfig, opts) case RoleLightsail: opts.Metrics = &lightsailMetrics{refreshMetrics: awsMetrics.refreshMetrics} return NewLightsailDiscovery(c.LightsailSDConfig, opts) diff --git a/discovery/aws/elasticache.go b/discovery/aws/elasticache.go new file mode 100644 index 0000000000..7ed598e294 --- /dev/null +++ b/discovery/aws/elasticache.go @@ -0,0 +1,907 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "context" + "errors" + "fmt" + "log/slog" + "maps" + "net" + "strconv" + "strings" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/credentials/stscreds" + "github.com/aws/aws-sdk-go-v2/service/elasticache" + "github.com/aws/aws-sdk-go-v2/service/elasticache/types" + "github.com/aws/aws-sdk-go-v2/service/sts" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/common/promslog" + "golang.org/x/sync/errgroup" + + "github.com/prometheus/prometheus/discovery" + "github.com/prometheus/prometheus/discovery/refresh" + "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/strutil" +) + +const ( + elasticacheLabel = model.MetaLabelPrefix + "elasticache_" + elasticacheLabelDeploymentOption = elasticacheLabel + "deployment_option" + + // cache cluster. + elasticacheLabelCacheCluster = elasticacheLabel + "cache_cluster_" + elasticacheLabelCacheClusterARN = elasticacheLabelCacheCluster + "arn" + elasticacheLabelCacheClusterAtRestEncryptionEnabled = elasticacheLabelCacheCluster + "at_rest_encryption_enabled" + elasticacheLabelCacheClusterAuthTokenEnabled = elasticacheLabelCacheCluster + "auth_token_enabled" + elasticacheLabelCacheClusterAuthTokenLastModified = elasticacheLabelCacheCluster + "auth_token_last_modified" + elasticacheLabelCacheClusterAutoMinorVersionUpgrade = elasticacheLabelCacheCluster + "auto_minor_version_upgrade" + elasticacheLabelCacheClusterCreateTime = elasticacheLabelCacheCluster + "cache_cluster_create_time" + elasticacheLabelCacheClusterID = elasticacheLabelCacheCluster + "cache_cluster_id" + elasticacheLabelCacheClusterStatus = elasticacheLabelCacheCluster + "cache_cluster_status" + elasticacheLabelCacheClusterNodeType = elasticacheLabelCacheCluster + "cache_node_type" + elasticacheLabelCacheClusterParameterGroup = elasticacheLabelCacheCluster + "cache_parameter_group" + elasticacheLabelCacheClusterSubnetGroupName = elasticacheLabelCacheCluster + "cache_subnet_group_name" + elasticacheLabelCacheClusterClientDownloadLandingPage = elasticacheLabelCacheCluster + "client_download_landing_page" + elasticacheLabelCacheClusterEngine = elasticacheLabelCacheCluster + "engine" + elasticacheLabelCacheClusterEngineVersion = elasticacheLabelCacheCluster + "engine_version" + elasticacheLabelCacheClusterIPDiscovery = elasticacheLabelCacheCluster + "ip_discovery" + elasticacheLabelCacheClusterNetworkType = elasticacheLabelCacheCluster + "network_type" + elasticacheLabelCacheClusterNumCacheNodes = elasticacheLabelCacheCluster + "num_cache_nodes" + elasticacheLabelCacheClusterPreferredAvailabilityZone = elasticacheLabelCacheCluster + "preferred_availability_zone" + elasticacheLabelCacheClusterPreferredMaintenanceWindow = elasticacheLabelCacheCluster + "preferred_maintenance_window" + elasticacheLabelCacheClusterPreferredOutpostARN = elasticacheLabelCacheCluster + "preferred_outpost_arn" + elasticacheLabelCacheClusterReplicationGroupID = elasticacheLabelCacheCluster + "replication_group_id" + elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled = elasticacheLabelCacheCluster + "replication_group_log_delivery_enabled" + elasticacheLabelCacheClusterSnapshotRetentionLimit = elasticacheLabelCacheCluster + "snapshot_retention_limit" + elasticacheLabelCacheClusterSnapshotWindow = elasticacheLabelCacheCluster + "snapshot_window" + elasticacheLabelCacheClusterTransitEncryptionEnabled = elasticacheLabelCacheCluster + "transit_encryption_enabled" + elasticacheLabelCacheClusterTransitEncryptionMode = elasticacheLabelCacheCluster + "transit_encryption_mode" + + // configuration endpoint. + elasticacheLabelCacheClusterConfigurationEndpoint = elasticacheLabelCacheCluster + "configuration_endpoint_" + elasticacheLabelCacheClusterConfigurationEndpointAddress = elasticacheLabelCacheClusterConfigurationEndpoint + "address" + elasticacheLabelCacheClusterConfigurationEndpointPort = elasticacheLabelCacheClusterConfigurationEndpoint + "port" + + // notification. + elasticacheLabelCacheClusterNotification = elasticacheLabelCacheCluster + "notification_" + elasticacheLabelCacheClusterNotificationTopicARN = elasticacheLabelCacheClusterNotification + "topic_arn" + elasticacheLabelCacheClusterNotificationTopicStatus = elasticacheLabelCacheClusterNotification + "topic_status" + + // log delivery configuration (slice - use with index). + elasticacheLabelCacheClusterLogDeliveryConfiguration = elasticacheLabelCacheCluster + "log_delivery_configuration_" + elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "destination_type" + elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_format" + elasticacheLabelCacheClusterLogDeliveryConfigurationLogType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_type" + elasticacheLabelCacheClusterLogDeliveryConfigurationStatus = elasticacheLabelCacheClusterLogDeliveryConfiguration + "status" + elasticacheLabelCacheClusterLogDeliveryConfigurationMessage = elasticacheLabelCacheClusterLogDeliveryConfiguration + "message" + elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_group" + elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream = elasticacheLabelCacheClusterLogDeliveryConfiguration + "delivery_stream" + + // pending modified values. + elasticacheLabelCacheClusterPendingModifiedValues = elasticacheLabelCacheCluster + "pending_modified_values_" + elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus = elasticacheLabelCacheClusterPendingModifiedValues + "auth_token_status" + elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_type" + elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion = elasticacheLabelCacheClusterPendingModifiedValues + "engine_version" + elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes = elasticacheLabelCacheClusterPendingModifiedValues + "num_cache_nodes" + elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_enabled" + elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_mode" + elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_ids_to_remove" + + // security group membership (slice - use with index). + elasticacheLabelCacheClusterSecurityGroupMembership = elasticacheLabelCacheCluster + "security_group_membership_" + elasticacheLabelCacheClusterSecurityGroupMembershipID = elasticacheLabelCacheClusterSecurityGroupMembership + "id" + elasticacheLabelCacheClusterSecurityGroupMembershipStatus = elasticacheLabelCacheClusterSecurityGroupMembership + "status" + + // tags - create one label per tag key, with the format: elasticache_cache_cluster_tag_. + elasticacheLabelCacheClusterTag = elasticacheLabelCacheCluster + "tag_" + + // node. + elasticacheLabelCacheClusterNode = elasticacheLabelCacheCluster + "node_" + elasticacheLabelCacheClusterNodeCreateTime = elasticacheLabelCacheClusterNode + "create_time" + elasticacheLabelCacheClusterNodeID = elasticacheLabelCacheClusterNode + "id" + elasticacheLabelCacheClusterNodeStatus = elasticacheLabelCacheClusterNode + "status" + elasticacheLabelCacheClusterNodeAZ = elasticacheLabelCacheClusterNode + "availability_zone" + elasticacheLabelCacheClusterNodeCustomerOutpostARN = elasticacheLabelCacheClusterNode + "customer_outpost_arn" + elasticacheLabelCacheClusterNodeSourceCacheNodeID = elasticacheLabelCacheClusterNode + "source_cache_node_id" + elasticacheLabelCacheClusterNodeParameterGroupStatus = elasticacheLabelCacheClusterNode + "parameter_group_status" + + // endpoint. + elasticacheLabelCacheClusterNodeEndpoint = elasticacheLabelCacheClusterNode + "endpoint_" + elasticacheLabelCacheClusterNodeEndpointAddress = elasticacheLabelCacheClusterNodeEndpoint + "address" + elasticacheLabelCacheClusterNodeEndpointPort = elasticacheLabelCacheClusterNodeEndpoint + "port" + + // serverless cache. + elasticacheLabelServerlessCache = elasticacheLabel + "serverless_cache_" + elasticacheLabelServerlessCacheARN = elasticacheLabelServerlessCache + "arn" + elasticacheLabelServerlessCacheName = elasticacheLabelServerlessCache + "name" + elasticacheLabelServerlessCacheCreateTime = elasticacheLabelServerlessCache + "create_time" + elasticacheLabelServerlessCacheDescription = elasticacheLabelServerlessCache + "description" + elasticacheLabelServerlessCacheEngine = elasticacheLabelServerlessCache + "engine" + elasticacheLabelServerlessCacheFullEngineVersion = elasticacheLabelServerlessCache + "full_engine_version" + elasticacheLabelServerlessCacheMajorEngineVersion = elasticacheLabelServerlessCache + "major_engine_version" + elasticacheLabelServerlessCacheStatus = elasticacheLabelServerlessCache + "status" + elasticacheLabelServerlessCacheKmsKeyID = elasticacheLabelServerlessCache + "kms_key_id" + elasticacheLabelServerlessCacheUserGroupID = elasticacheLabelServerlessCache + "user_group_id" + elasticacheLabelServerlessCacheDailySnapshotTime = elasticacheLabelServerlessCache + "daily_snapshot_time" + elasticacheLabelServerlessCacheSnapshotRetentionLimit = elasticacheLabelServerlessCache + "snapshot_retention_limit" + + // endpoint. + elasticacheLabelServerlessCacheEndpoint = elasticacheLabelServerlessCache + "endpoint_" + elasticacheLabelServerlessCacheEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "address" + elasticacheLabelServerlessCacheEndpointPort = elasticacheLabelServerlessCacheEndpoint + "port" + elasticacheLabelServerlessCacheReaderEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "reader_address" + elasticacheLabelServerlessCacheReaderEndpointPort = elasticacheLabelServerlessCacheEndpoint + "reader_port" + + // security group membership (slice - use with index). + elasticacheLabelServerlessCacheSecurityGroupID = elasticacheLabelServerlessCache + "security_group_id" + + // Subnet group membership (slice - use with index). + elasticacheLabelServerlessCacheSubnetID = elasticacheLabelServerlessCache + "subnet_id" + + // cache usage limits. + elasticacheLabelServerlessCacheCacheUsageLimit = elasticacheLabelServerlessCache + "cache_usage_limit_" + elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage = elasticacheLabelServerlessCacheCacheUsageLimit + "data_storage" + elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "maximum" + elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "minimum" + elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "unit" + elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond = elasticacheLabelServerlessCacheCacheUsageLimit + "ecpu_per_second" + elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "maximum" + elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "minimum" + + // tags - create one label per tag key, with the format: elasticache_serverless_cache_tag_. + elasticacheLabelServerlessCacheTag = elasticacheLabelServerlessCache + "tag_" +) + +// DefaultElasticacheSDConfig is the default Elasticache SD configuration. +var DefaultElasticacheSDConfig = ElasticacheSDConfig{ + Port: 80, + RefreshInterval: model.Duration(60 * time.Second), + RequestConcurrency: 10, + HTTPClientConfig: config.DefaultHTTPClientConfig, +} + +func init() { + discovery.RegisterConfig(&ElasticacheSDConfig{}) +} + +// ElasticacheSDConfig is the configuration for Elasticache based service discovery. +type ElasticacheSDConfig struct { + Region string `yaml:"region"` + Endpoint string `yaml:"endpoint"` + AccessKey string `yaml:"access_key,omitempty"` + SecretKey config.Secret `yaml:"secret_key,omitempty"` + Profile string `yaml:"profile,omitempty"` + RoleARN string `yaml:"role_arn,omitempty"` + Clusters []string `yaml:"clusters,omitempty"` + Port int `yaml:"port"` + RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"` + + // RequestConcurrency controls the maximum number of concurrent Elasticache API requests. + RequestConcurrency int `yaml:"request_concurrency,omitempty"` + + HTTPClientConfig config.HTTPClientConfig `yaml:",inline"` +} + +// NewDiscovererMetrics implements discovery.Config. +func (*ElasticacheSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics { + return &elasticacheMetrics{ + refreshMetrics: rmi, + } +} + +// Name returns the name of the Elasticache Config. +func (*ElasticacheSDConfig) Name() string { return "elasticache" } + +// NewDiscoverer returns a Discoverer for the Elasticache Config. +func (c *ElasticacheSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) { + return NewElasticacheDiscovery(c, opts) +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface for the Elasticache Config. +func (c *ElasticacheSDConfig) UnmarshalYAML(unmarshal func(any) error) error { + *c = DefaultElasticacheSDConfig + type plain ElasticacheSDConfig + err := unmarshal((*plain)(c)) + if err != nil { + return err + } + + c.Region, err = loadRegion(context.Background(), c.Region) + if err != nil { + return fmt.Errorf("could not determine AWS region: %w", err) + } + + return c.HTTPClientConfig.Validate() +} + +type elasticacheClient interface { + DescribeServerlessCaches(ctx context.Context, params *elasticache.DescribeServerlessCachesInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeServerlessCachesOutput, error) + DescribeCacheClusters(ctx context.Context, params *elasticache.DescribeCacheClustersInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeCacheClustersOutput, error) + ListTagsForResource(ctx context.Context, params *elasticache.ListTagsForResourceInput, optFns ...func(*elasticache.Options)) (*elasticache.ListTagsForResourceOutput, error) +} + +// ElasticacheDiscovery periodically performs Elasticache-SD requests. +// It implements the Discoverer interface. +type ElasticacheDiscovery struct { + *refresh.Discovery + logger *slog.Logger + cfg *ElasticacheSDConfig + elasticacheClient elasticacheClient +} + +// NewElasticacheDiscovery returns a new ElasticacheDiscovery which periodically refreshes its targets. +func NewElasticacheDiscovery(conf *ElasticacheSDConfig, opts discovery.DiscovererOptions) (*ElasticacheDiscovery, error) { + m, ok := opts.Metrics.(*elasticacheMetrics) + if !ok { + return nil, errors.New("invalid discovery metrics type") + } + + if opts.Logger == nil { + opts.Logger = promslog.NewNopLogger() + } + d := &ElasticacheDiscovery{ + logger: opts.Logger, + cfg: conf, + } + d.Discovery = refresh.NewDiscovery( + refresh.Options{ + Logger: opts.Logger, + Mech: "elasticache", + Interval: time.Duration(d.cfg.RefreshInterval), + RefreshF: d.refresh, + MetricsInstantiator: m.refreshMetrics, + }, + ) + return d, nil +} + +func (d *ElasticacheDiscovery) initElasticacheClient(ctx context.Context) error { + if d.elasticacheClient != nil { + return nil + } + + if d.cfg.Region == "" { + return errors.New("region must be set for Elasticache service discovery") + } + + // Build the HTTP client from the provided HTTPClientConfig. + client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "elasticache_sd") + if err != nil { + return err + } + + // Build the AWS config with the provided region. + var configOptions []func(*awsConfig.LoadOptions) error + configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region)) + configOptions = append(configOptions, awsConfig.WithHTTPClient(client)) + + // Only set static credentials if both access key and secret key are provided + // Otherwise, let AWS SDK use its default credential chain + if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" { + credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "") + configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider)) + } + + if d.cfg.Profile != "" { + configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile)) + } + + cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...) + if err != nil { + d.logger.Error("Failed to create AWS config", "error", err) + return fmt.Errorf("could not create aws config: %w", err) + } + + // If the role ARN is set, assume the role to get credentials and set the credentials provider in the config. + if d.cfg.RoleARN != "" { + assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN) + cfg.Credentials = aws.NewCredentialsCache(assumeProvider) + } + + d.elasticacheClient = elasticache.NewFromConfig(cfg, func(options *elasticache.Options) { + if d.cfg.Endpoint != "" { + options.BaseEndpoint = &d.cfg.Endpoint + } + options.HTTPClient = client + }) + + // Test credentials by making a simple API call + testCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + _, err = d.elasticacheClient.DescribeCacheClusters(testCtx, &elasticache.DescribeCacheClustersInput{}) + if err != nil { + d.logger.Error("Failed to test Elasticache credentials", "error", err) + return fmt.Errorf("elasticache credential test failed: %w", err) + } + + return nil +} + +// describeServerlessCaches calls DescribeServerlessCaches API for the given cache IDs (or all caches if no IDs are provided) and returns the list of serverless caches. +func (d *ElasticacheDiscovery) describeServerlessCaches(ctx context.Context, caches []string) ([]types.ServerlessCache, error) { + mu := &sync.Mutex{} + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + var serverlessCaches []types.ServerlessCache + if len(caches) == 0 { + errg.Go(func() error { + var nextToken *string + for { + output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{ + MaxResults: aws.Int32(50), + NextToken: nextToken, + }) + if err != nil { + return fmt.Errorf("failed to describe serverless caches: %w", err) + } + mu.Lock() + serverlessCaches = append(serverlessCaches, output.ServerlessCaches...) + mu.Unlock() + if output.NextToken == nil { + break + } + nextToken = output.NextToken + } + return nil + }) + } else { + for _, cacheID := range caches { + errg.Go(func() error { + output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{ + MaxResults: aws.Int32(50), + NextToken: nil, + ServerlessCacheName: aws.String(cacheID), + }) + if err != nil { + return fmt.Errorf("failed to describe serverless cache %s: %w", cacheID, err) + } + mu.Lock() + serverlessCaches = append(serverlessCaches, output.ServerlessCaches...) + mu.Unlock() + return nil + }) + } + } + + return serverlessCaches, errg.Wait() +} + +// describeCacheClusters calls DescribeCacheClusters API for the given cache cluster IDs (or all cache clusters if no IDs are provided) and returns the list of cache clusters. +func (d *ElasticacheDiscovery) describeCacheClusters(ctx context.Context, caches []string) ([]types.CacheCluster, error) { + mu := &sync.Mutex{} + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + showCacheClustersNotInReplicationGroupsBools := []bool{false, true} + var cacheClusters []types.CacheCluster + if len(caches) == 0 { + for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools { + errg.Go(func() error { + var nextToken *string + for { + output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{ + MaxRecords: aws.Int32(100), + Marker: nextToken, + ShowCacheNodeInfo: aws.Bool(true), + ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool), + }) + if err != nil { + return fmt.Errorf("failed to describe cache clusters: %w", err) + } + mu.Lock() + cacheClusters = append(cacheClusters, output.CacheClusters...) + mu.Unlock() + if output.Marker == nil { + break + } + nextToken = output.Marker + } + return nil + }) + } + } else { + for _, cacheID := range caches { + for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools { + errg.Go(func() error { + output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{ + MaxRecords: aws.Int32(100), + Marker: nil, + ShowCacheNodeInfo: aws.Bool(true), + ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool), + CacheClusterId: aws.String(cacheID), + }) + if err != nil { + return fmt.Errorf("failed to describe cache cluster %s: %w", cacheID, err) + } + mu.Lock() + cacheClusters = append(cacheClusters, output.CacheClusters...) + mu.Unlock() + return nil + }) + } + } + } + + return cacheClusters, errg.Wait() +} + +// listTagsForResource calls ListTagsForResource API for the given resource ARNs and returns a map of resource ARN to list of tags. +func (d *ElasticacheDiscovery) listTagsForResource(ctx context.Context, resourceARNs []string) (map[string][]types.Tag, error) { + mu := &sync.Mutex{} + errg, ectx := errgroup.WithContext(ctx) + errg.SetLimit(d.cfg.RequestConcurrency) + tagsByResourceARN := make(map[string][]types.Tag) + for _, resourceARN := range resourceARNs { + errg.Go(func() error { + output, err := d.elasticacheClient.ListTagsForResource(ectx, &elasticache.ListTagsForResourceInput{ + ResourceName: aws.String(resourceARN), + }) + if err != nil { + return fmt.Errorf("failed to list tags for resource %s: %w", resourceARN, err) + } + mu.Lock() + tagsByResourceARN[resourceARN] = output.TagList + mu.Unlock() + return nil + }) + } + return tagsByResourceARN, errg.Wait() +} + +func (d *ElasticacheDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) { + err := d.initElasticacheClient(ctx) + if err != nil { + return nil, err + } + + var clusters []string + clustersMu := sync.Mutex{} + serverlessCacheIDs, cacheClusterIDs := splitCacheDeploymentOptions(d.cfg.Clusters) + + clusterErrg, clusterCtx := errgroup.WithContext(ctx) + clusterErrg.Go(func() error { + caches, err := d.describeServerlessCaches(clusterCtx, serverlessCacheIDs) + if err != nil { + return fmt.Errorf("failed to describe serverless caches: %w", err) + } + for _, cache := range caches { + clustersMu.Lock() + clusters = append(clusters, *cache.ARN) + clustersMu.Unlock() + } + return nil + }) + + clusterErrg.Go(func() error { + cacheClusters, err := d.describeCacheClusters(clusterCtx, cacheClusterIDs) + if err != nil { + return fmt.Errorf("failed to describe cache clusters: %w", err) + } + for _, cluster := range cacheClusters { + clustersMu.Lock() + clusters = append(clusters, *cluster.ARN) + clustersMu.Unlock() + } + return nil + }) + + if err := clusterErrg.Wait(); err != nil { + return nil, err + } + + tagsByResourceARN, err := d.listTagsForResource(ctx, clusters) + if err != nil { + return nil, fmt.Errorf("failed to list tags for resources: %w", err) + } + + tg := &targetgroup.Group{ + Source: d.cfg.Region, + } + + errg, ectx := errgroup.WithContext(ctx) + errg.Go(func() error { + caches, err := d.describeServerlessCaches(ectx, serverlessCacheIDs) + if err != nil { + return fmt.Errorf("failed to describe serverless caches: %w", err) + } + for _, cache := range caches { + addServerlessCacheTargets(tg, &cache, tagsByResourceARN[*cache.ARN]) + } + return nil + }) + + errg.Go(func() error { + cacheClusters, err := d.describeCacheClusters(ectx, cacheClusterIDs) + if err != nil { + return fmt.Errorf("failed to describe cache clusters: %w", err) + } + for _, cluster := range cacheClusters { + addCacheClusterTargets(tg, &cluster, tagsByResourceARN[*cluster.ARN]) + } + return nil + }) + + if err := errg.Wait(); err != nil { + return nil, err + } + + return []*targetgroup.Group{tg}, nil +} + +// splitCacheTypes takes a list of cache ARNs and splits them into serverless cache IDs and cache cluster IDs based on their format. +// Serverless caches are in the format arn:aws:elasticache:::serverlesscache: +// Cache clusters are in the format arn:aws:elasticache:::replicationgroup:. +func splitCacheDeploymentOptions(caches []string) (serverlessCacheIDs, cacheClusterIDs []string) { + for _, cacheARN := range caches { + if len(cacheARN) == 0 { + continue + } + parts := strings.Split(cacheARN, ":") + if len(parts) < 6 { + continue + } + resourceType := parts[5] + resourceID := parts[6] + switch resourceType { + case "serverlesscache": + serverlessCacheIDs = append(serverlessCacheIDs, resourceID) + case "replicationgroup": + cacheClusterIDs = append(cacheClusterIDs, resourceID) + default: + continue + } + } + return serverlessCacheIDs, cacheClusterIDs +} + +// addServerlessCacheTargets adds targets for a serverless cache to the target group. +func addServerlessCacheTargets(tg *targetgroup.Group, cache *types.ServerlessCache, tags []types.Tag) { + labels := model.LabelSet{ + elasticacheLabelDeploymentOption: model.LabelValue("serverless"), + elasticacheLabelServerlessCacheARN: model.LabelValue(*cache.ARN), + elasticacheLabelServerlessCacheName: model.LabelValue(*cache.ServerlessCacheName), + elasticacheLabelServerlessCacheStatus: model.LabelValue(*cache.Status), + elasticacheLabelServerlessCacheEngine: model.LabelValue(*cache.Engine), + elasticacheLabelServerlessCacheFullEngineVersion: model.LabelValue(*cache.FullEngineVersion), + elasticacheLabelServerlessCacheMajorEngineVersion: model.LabelValue(*cache.MajorEngineVersion), + } + + if cache.Description != nil { + labels[elasticacheLabelServerlessCacheDescription] = model.LabelValue(*cache.Description) + } + + if cache.CreateTime != nil { + labels[elasticacheLabelServerlessCacheCreateTime] = model.LabelValue(cache.CreateTime.Format(time.RFC3339)) + } + + if cache.KmsKeyId != nil { + labels[elasticacheLabelServerlessCacheKmsKeyID] = model.LabelValue(*cache.KmsKeyId) + } + + if cache.UserGroupId != nil { + labels[elasticacheLabelServerlessCacheUserGroupID] = model.LabelValue(*cache.UserGroupId) + } + + if cache.DailySnapshotTime != nil { + labels[elasticacheLabelServerlessCacheDailySnapshotTime] = model.LabelValue(*cache.DailySnapshotTime) + } + + if cache.SnapshotRetentionLimit != nil { + labels[elasticacheLabelServerlessCacheSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cache.SnapshotRetentionLimit))) + } + + if cache.Endpoint != nil { + if cache.Endpoint.Address != nil { + labels[elasticacheLabelServerlessCacheEndpointAddress] = model.LabelValue(*cache.Endpoint.Address) + } + if cache.Endpoint.Port != nil { + labels[elasticacheLabelServerlessCacheEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.Endpoint.Port))) + } + } + + if cache.ReaderEndpoint != nil { + if cache.ReaderEndpoint.Address != nil { + labels[elasticacheLabelServerlessCacheReaderEndpointAddress] = model.LabelValue(*cache.ReaderEndpoint.Address) + } + if cache.ReaderEndpoint.Port != nil { + labels[elasticacheLabelServerlessCacheReaderEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.ReaderEndpoint.Port))) + } + } + + for i, sgID := range cache.SecurityGroupIds { + labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSecurityGroupID, i))] = model.LabelValue(sgID) + } + + for i, subnetID := range cache.SubnetIds { + labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSubnetID, i))] = model.LabelValue(subnetID) + } + + if cache.CacheUsageLimits != nil { + if cache.CacheUsageLimits.DataStorage != nil { + if cache.CacheUsageLimits.DataStorage.Maximum != nil { + labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Maximum))) + } + if cache.CacheUsageLimits.DataStorage.Minimum != nil { + labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Minimum))) + } + labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit] = model.LabelValue(cache.CacheUsageLimits.DataStorage.Unit) + } + if cache.CacheUsageLimits.ECPUPerSecond != nil { + if cache.CacheUsageLimits.ECPUPerSecond.Maximum != nil { + labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Maximum))) + } + if cache.CacheUsageLimits.ECPUPerSecond.Minimum != nil { + labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Minimum))) + } + } + } + + for _, tag := range tags { + if tag.Key != nil && tag.Value != nil { + labels[model.LabelName(elasticacheLabelServerlessCacheTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value) + } + } + + // Set the address label using the endpoint + if cache.Endpoint != nil && cache.Endpoint.Address != nil && cache.Endpoint.Port != nil { + labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*cache.Endpoint.Address, strconv.Itoa(int(*cache.Endpoint.Port)))) + } + + tg.Targets = append(tg.Targets, labels) +} + +// addCacheClusterTargets adds targets for a cache cluster to the target group. +// Creates one target per cache node for individual scraping. +func addCacheClusterTargets(tg *targetgroup.Group, cluster *types.CacheCluster, tags []types.Tag) { + // Build common labels that apply to all nodes in this cluster + commonLabels := model.LabelSet{ + elasticacheLabelDeploymentOption: model.LabelValue("node"), + elasticacheLabelCacheClusterARN: model.LabelValue(*cluster.ARN), + elasticacheLabelCacheClusterID: model.LabelValue(*cluster.CacheClusterId), + elasticacheLabelCacheClusterStatus: model.LabelValue(*cluster.CacheClusterStatus), + } + + if cluster.AtRestEncryptionEnabled != nil { + commonLabels[elasticacheLabelCacheClusterAtRestEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AtRestEncryptionEnabled)) + } + + if cluster.AuthTokenEnabled != nil { + commonLabels[elasticacheLabelCacheClusterAuthTokenEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AuthTokenEnabled)) + } + + if cluster.AuthTokenLastModifiedDate != nil { + commonLabels[elasticacheLabelCacheClusterAuthTokenLastModified] = model.LabelValue(cluster.AuthTokenLastModifiedDate.Format(time.RFC3339)) + } + + if cluster.AutoMinorVersionUpgrade != nil { + commonLabels[elasticacheLabelCacheClusterAutoMinorVersionUpgrade] = model.LabelValue(strconv.FormatBool(*cluster.AutoMinorVersionUpgrade)) + } + + if cluster.CacheClusterCreateTime != nil { + commonLabels[elasticacheLabelCacheClusterCreateTime] = model.LabelValue(cluster.CacheClusterCreateTime.Format(time.RFC3339)) + } + + if cluster.CacheNodeType != nil { + commonLabels[elasticacheLabelCacheClusterNodeType] = model.LabelValue(*cluster.CacheNodeType) + } + + if cluster.CacheParameterGroup != nil && cluster.CacheParameterGroup.CacheParameterGroupName != nil { + commonLabels[elasticacheLabelCacheClusterParameterGroup] = model.LabelValue(*cluster.CacheParameterGroup.CacheParameterGroupName) + } + + if cluster.CacheSubnetGroupName != nil { + commonLabels[elasticacheLabelCacheClusterSubnetGroupName] = model.LabelValue(*cluster.CacheSubnetGroupName) + } + + if cluster.ClientDownloadLandingPage != nil { + commonLabels[elasticacheLabelCacheClusterClientDownloadLandingPage] = model.LabelValue(*cluster.ClientDownloadLandingPage) + } + + if cluster.ConfigurationEndpoint != nil { + if cluster.ConfigurationEndpoint.Address != nil { + commonLabels[elasticacheLabelCacheClusterConfigurationEndpointAddress] = model.LabelValue(*cluster.ConfigurationEndpoint.Address) + } + if cluster.ConfigurationEndpoint.Port != nil { + commonLabels[elasticacheLabelCacheClusterConfigurationEndpointPort] = model.LabelValue(strconv.Itoa(int(*cluster.ConfigurationEndpoint.Port))) + } + } + + if cluster.Engine != nil { + commonLabels[elasticacheLabelCacheClusterEngine] = model.LabelValue(*cluster.Engine) + } + + if cluster.EngineVersion != nil { + commonLabels[elasticacheLabelCacheClusterEngineVersion] = model.LabelValue(*cluster.EngineVersion) + } + + if len(cluster.IpDiscovery) > 0 { + commonLabels[elasticacheLabelCacheClusterIPDiscovery] = model.LabelValue(cluster.IpDiscovery) + } + + if len(cluster.NetworkType) > 0 { + commonLabels[elasticacheLabelCacheClusterNetworkType] = model.LabelValue(cluster.NetworkType) + } + + if cluster.NotificationConfiguration != nil { + if cluster.NotificationConfiguration.TopicArn != nil { + commonLabels[elasticacheLabelCacheClusterNotificationTopicARN] = model.LabelValue(*cluster.NotificationConfiguration.TopicArn) + } + if cluster.NotificationConfiguration.TopicStatus != nil { + commonLabels[elasticacheLabelCacheClusterNotificationTopicStatus] = model.LabelValue(*cluster.NotificationConfiguration.TopicStatus) + } + } + + if cluster.NumCacheNodes != nil { + commonLabels[elasticacheLabelCacheClusterNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.NumCacheNodes))) + } + + if cluster.PreferredAvailabilityZone != nil { + commonLabels[elasticacheLabelCacheClusterPreferredAvailabilityZone] = model.LabelValue(*cluster.PreferredAvailabilityZone) + } + + if cluster.PreferredMaintenanceWindow != nil { + commonLabels[elasticacheLabelCacheClusterPreferredMaintenanceWindow] = model.LabelValue(*cluster.PreferredMaintenanceWindow) + } + + if cluster.PreferredOutpostArn != nil { + commonLabels[elasticacheLabelCacheClusterPreferredOutpostARN] = model.LabelValue(*cluster.PreferredOutpostArn) + } + + if cluster.ReplicationGroupId != nil { + commonLabels[elasticacheLabelCacheClusterReplicationGroupID] = model.LabelValue(*cluster.ReplicationGroupId) + } + + if cluster.ReplicationGroupLogDeliveryEnabled != nil { + commonLabels[elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled] = model.LabelValue(strconv.FormatBool(*cluster.ReplicationGroupLogDeliveryEnabled)) + } + + if cluster.SnapshotRetentionLimit != nil { + commonLabels[elasticacheLabelCacheClusterSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cluster.SnapshotRetentionLimit))) + } + + if cluster.SnapshotWindow != nil { + commonLabels[elasticacheLabelCacheClusterSnapshotWindow] = model.LabelValue(*cluster.SnapshotWindow) + } + + if cluster.TransitEncryptionEnabled != nil { + commonLabels[elasticacheLabelCacheClusterTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.TransitEncryptionEnabled)) + } + + if len(cluster.TransitEncryptionMode) > 0 { + commonLabels[elasticacheLabelCacheClusterTransitEncryptionMode] = model.LabelValue(cluster.TransitEncryptionMode) + } + + // Log delivery configurations (slice) + for i, logDelivery := range cluster.LogDeliveryConfigurations { + if len(logDelivery.DestinationType) > 0 { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType, i))] = model.LabelValue(logDelivery.DestinationType) + } + if len(logDelivery.LogFormat) > 0 { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat, i))] = model.LabelValue(logDelivery.LogFormat) + } + if len(logDelivery.LogType) > 0 { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogType, i))] = model.LabelValue(logDelivery.LogType) + } + if len(logDelivery.Status) > 0 { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationStatus, i))] = model.LabelValue(logDelivery.Status) + } + if logDelivery.Message != nil { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationMessage, i))] = model.LabelValue(*logDelivery.Message) + } + if logDelivery.DestinationDetails != nil { + if logDelivery.DestinationDetails.CloudWatchLogsDetails != nil && logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup != nil { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup, i))] = model.LabelValue(*logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup) + } + if logDelivery.DestinationDetails.KinesisFirehoseDetails != nil && logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream != nil { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream, i))] = model.LabelValue(*logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream) + } + } + } + + // Pending modified values + if cluster.PendingModifiedValues != nil { + if len(cluster.PendingModifiedValues.AuthTokenStatus) > 0 { + commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus] = model.LabelValue(cluster.PendingModifiedValues.AuthTokenStatus) + } + if cluster.PendingModifiedValues.CacheNodeType != nil { + commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType] = model.LabelValue(*cluster.PendingModifiedValues.CacheNodeType) + } + if cluster.PendingModifiedValues.EngineVersion != nil { + commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion] = model.LabelValue(*cluster.PendingModifiedValues.EngineVersion) + } + if cluster.PendingModifiedValues.NumCacheNodes != nil { + commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.PendingModifiedValues.NumCacheNodes))) + } + if cluster.PendingModifiedValues.TransitEncryptionEnabled != nil { + commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.PendingModifiedValues.TransitEncryptionEnabled)) + } + if len(cluster.PendingModifiedValues.TransitEncryptionMode) > 0 { + commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode] = model.LabelValue(cluster.PendingModifiedValues.TransitEncryptionMode) + } + if len(cluster.PendingModifiedValues.CacheNodeIdsToRemove) > 0 { + commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove] = model.LabelValue(strings.Join(cluster.PendingModifiedValues.CacheNodeIdsToRemove, ",")) + } + } + + // Security group membership (slice) + for i, sg := range cluster.SecurityGroups { + if sg.SecurityGroupId != nil { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipID, i))] = model.LabelValue(*sg.SecurityGroupId) + } + if sg.Status != nil { + commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipStatus, i))] = model.LabelValue(*sg.Status) + } + } + + // Tags + for _, tag := range tags { + if tag.Key != nil && tag.Value != nil { + commonLabels[model.LabelName(elasticacheLabelCacheClusterTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value) + } + } + + // Create one target per cache node + for _, node := range cluster.CacheNodes { + // Clone common labels for this node + labels := make(model.LabelSet, len(commonLabels)) + maps.Copy(labels, commonLabels) + + // Add node-specific labels + if node.CacheNodeId != nil { + labels[elasticacheLabelCacheClusterNodeID] = model.LabelValue(*node.CacheNodeId) + } + if node.CacheNodeStatus != nil { + labels[elasticacheLabelCacheClusterNodeStatus] = model.LabelValue(*node.CacheNodeStatus) + } + if node.CacheNodeCreateTime != nil { + labels[elasticacheLabelCacheClusterNodeCreateTime] = model.LabelValue(node.CacheNodeCreateTime.Format(time.RFC3339)) + } + if node.CustomerAvailabilityZone != nil { + labels[elasticacheLabelCacheClusterNodeAZ] = model.LabelValue(*node.CustomerAvailabilityZone) + } + if node.CustomerOutpostArn != nil { + labels[elasticacheLabelCacheClusterNodeCustomerOutpostARN] = model.LabelValue(*node.CustomerOutpostArn) + } + if node.SourceCacheNodeId != nil { + labels[elasticacheLabelCacheClusterNodeSourceCacheNodeID] = model.LabelValue(*node.SourceCacheNodeId) + } + if node.ParameterGroupStatus != nil { + labels[elasticacheLabelCacheClusterNodeParameterGroupStatus] = model.LabelValue(*node.ParameterGroupStatus) + } + if node.Endpoint != nil { + if node.Endpoint.Address != nil { + labels[elasticacheLabelCacheClusterNodeEndpointAddress] = model.LabelValue(*node.Endpoint.Address) + } + if node.Endpoint.Port != nil { + labels[elasticacheLabelCacheClusterNodeEndpointPort] = model.LabelValue(strconv.Itoa(int(*node.Endpoint.Port))) + } + + // Set the address label to this node's endpoint + if node.Endpoint.Address != nil && node.Endpoint.Port != nil { + labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*node.Endpoint.Address, strconv.Itoa(int(*node.Endpoint.Port)))) + } + } + + tg.Targets = append(tg.Targets, labels) + } +} diff --git a/discovery/aws/elasticache_test.go b/discovery/aws/elasticache_test.go new file mode 100644 index 0000000000..4611f33059 --- /dev/null +++ b/discovery/aws/elasticache_test.go @@ -0,0 +1,615 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "context" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/elasticache" + "github.com/aws/aws-sdk-go-v2/service/elasticache/types" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/discovery/targetgroup" +) + +// Struct for test data. +type elasticacheDataStore struct { + region string + serverlessCaches []types.ServerlessCache + cacheClusters []types.CacheCluster + tags map[string][]types.Tag // keyed by cache ARN +} + +func TestElasticacheDiscoveryDescribeServerlessCaches(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecData *elasticacheDataStore + cacheNames []string + expectedCount int + }{ + { + name: "MultipleCaches", + ecData: &elasticacheDataStore{ + region: "us-west-2", + serverlessCaches: []types.ServerlessCache{ + { + ServerlessCacheName: strptr("test-cache"), + ARN: strptr("arn:aws:elasticache:us-west-2:123456789012:serverlesscache:test-cache"), + Status: strptr("available"), + Engine: strptr("redis"), + FullEngineVersion: strptr("7.1"), + CreateTime: aws.Time(time.Now()), + Endpoint: &types.Endpoint{ + Address: strptr("test-cache.serverless.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + }, + { + ServerlessCacheName: strptr("prod-cache"), + ARN: strptr("arn:aws:elasticache:us-west-2:123456789012:serverlesscache:prod-cache"), + Status: strptr("available"), + Engine: strptr("valkey"), + FullEngineVersion: strptr("7.2"), + CreateTime: aws.Time(time.Now()), + Endpoint: &types.Endpoint{ + Address: strptr("prod-cache.serverless.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + }, + }, + }, + cacheNames: []string{}, + expectedCount: 2, + }, + { + name: "SingleCache", + ecData: &elasticacheDataStore{ + region: "us-east-1", + serverlessCaches: []types.ServerlessCache{ + { + ServerlessCacheName: strptr("single-cache"), + ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:serverlesscache:single-cache"), + Status: strptr("available"), + Engine: strptr("redis"), + FullEngineVersion: strptr("7.1"), + CreateTime: aws.Time(time.Now()), + }, + }, + }, + cacheNames: []string{"single-cache"}, + expectedCount: 1, + }, + { + name: "NoCaches", + ecData: &elasticacheDataStore{ + region: "us-east-1", + serverlessCaches: []types.ServerlessCache{}, + }, + cacheNames: []string{}, + expectedCount: 0, + }, + } { + t.Run(tt.name, func(t *testing.T) { + client := newMockElasticacheClient(tt.ecData) + + d := &ElasticacheDiscovery{ + elasticacheClient: client, + cfg: &ElasticacheSDConfig{ + Region: tt.ecData.region, + RequestConcurrency: 10, + }, + } + + caches, err := d.describeServerlessCaches(ctx, tt.cacheNames) + require.NoError(t, err) + require.Len(t, caches, tt.expectedCount) + }) + } +} + +func TestElasticacheDiscoveryDescribeCacheClusters(t *testing.T) { + ctx := context.Background() + + for _, tt := range []struct { + name string + ecData *elasticacheDataStore + clusterIDs []string + expectedCount int + skipTest bool + }{ + { + name: "MockValidation", + ecData: &elasticacheDataStore{ + region: "us-west-2", + cacheClusters: []types.CacheCluster{ + { + CacheClusterId: strptr("test-cluster-001"), + ARN: strptr("arn:aws:elasticache:us-west-2:123456789012:cluster:test-cluster-001"), + CacheClusterStatus: strptr("available"), + Engine: strptr("redis"), + EngineVersion: strptr("7.1"), + CacheNodeType: strptr("cache.t3.micro"), + NumCacheNodes: aws.Int32(1), + ConfigurationEndpoint: &types.Endpoint{ + Address: strptr("test-cluster.abc123.cfg.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + }, + }, + }, + clusterIDs: []string{}, + expectedCount: 1, + skipTest: false, + }, + { + name: "NoClusters", + ecData: &elasticacheDataStore{ + region: "us-east-1", + cacheClusters: []types.CacheCluster{}, + }, + clusterIDs: []string{}, + expectedCount: 0, + skipTest: false, + }, + } { + t.Run(tt.name, func(t *testing.T) { + if tt.skipTest { + t.Skip("Skipping complex test with concurrency") + } + client := newMockElasticacheClient(tt.ecData) + + // Verify mock returns expected data + output, err := client.DescribeCacheClusters(ctx, &elasticache.DescribeCacheClustersInput{}) + require.NoError(t, err) + require.Len(t, output.CacheClusters, tt.expectedCount) + }) + } +} + +func TestAddServerlessCacheTargets(t *testing.T) { + testTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + cache *types.ServerlessCache + tags []types.Tag + expectedLabels model.LabelSet + }{ + { + name: "ServerlessCacheWithEndpoint", + cache: &types.ServerlessCache{ + ServerlessCacheName: strptr("my-cache"), + ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:serverlesscache:my-cache"), + Status: strptr("available"), + Engine: strptr("redis"), + FullEngineVersion: strptr("7.1"), + MajorEngineVersion: strptr("7"), + CreateTime: aws.Time(testTime), + Endpoint: &types.Endpoint{ + Address: strptr("my-cache.serverless.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + ReaderEndpoint: &types.Endpoint{ + Address: strptr("my-cache-ro.serverless.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + SecurityGroupIds: []string{"sg-12345"}, + SubnetIds: []string{"subnet-abcdef"}, + CacheUsageLimits: &types.CacheUsageLimits{ + DataStorage: &types.DataStorage{ + Maximum: aws.Int32(10), + Minimum: aws.Int32(1), + Unit: types.DataStorageUnitGb, + }, + ECPUPerSecond: &types.ECPUPerSecond{ + Maximum: aws.Int32(5000), + Minimum: aws.Int32(1000), + }, + }, + }, + tags: []types.Tag{ + {Key: strptr("Environment"), Value: strptr("test")}, + }, + expectedLabels: model.LabelSet{ + "__meta_elasticache_deployment_option": "serverless", + "__meta_elasticache_serverless_cache_arn": "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:my-cache", + "__meta_elasticache_serverless_cache_name": "my-cache", + "__meta_elasticache_serverless_cache_status": "available", + "__meta_elasticache_serverless_cache_engine": "redis", + "__meta_elasticache_serverless_cache_full_engine_version": "7.1", + "__meta_elasticache_serverless_cache_major_engine_version": "7", + "__meta_elasticache_serverless_cache_create_time": "2024-01-01T00:00:00Z", + "__meta_elasticache_serverless_cache_endpoint_address": "my-cache.serverless.use1.cache.amazonaws.com", + "__meta_elasticache_serverless_cache_endpoint_port": "6379", + + "__meta_elasticache_serverless_cache_security_group_id_0": "sg-12345", + "__meta_elasticache_serverless_cache_subnet_id_0": "subnet-abcdef", + + "__address__": "my-cache.serverless.use1.cache.amazonaws.com:6379", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tg := &targetgroup.Group{ + Source: "test", + } + + addServerlessCacheTargets(tg, tt.cache, tt.tags) + + require.Len(t, tg.Targets, 1) + labels := tg.Targets[0] + + // Check that all expected labels are present with correct values + for k, v := range tt.expectedLabels { + actualValue, exists := labels[k] + require.True(t, exists, "label %s should exist", k) + require.Equal(t, v, actualValue, "label %s mismatch", k) + } + }) + } +} + +func TestAddCacheClusterTargets(t *testing.T) { + testTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + cluster *types.CacheCluster + tags []types.Tag + expectedTargetCount int + expectedLabels []model.LabelSet // One per node + }{ + { + name: "CacheClusterWithMultipleNodes", + cluster: &types.CacheCluster{ + CacheClusterId: strptr("my-cluster-001"), + ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:cluster:my-cluster-001"), + CacheClusterStatus: strptr("available"), + Engine: strptr("redis"), + EngineVersion: strptr("7.1"), + CacheNodeType: strptr("cache.t3.micro"), + NumCacheNodes: aws.Int32(2), + CacheClusterCreateTime: aws.Time(testTime), + ConfigurationEndpoint: &types.Endpoint{ + Address: strptr("my-cluster.abc123.cfg.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + AtRestEncryptionEnabled: aws.Bool(true), + TransitEncryptionEnabled: aws.Bool(true), + AuthTokenEnabled: aws.Bool(true), + AutoMinorVersionUpgrade: aws.Bool(true), + CacheSubnetGroupName: strptr("my-subnet-group"), + PreferredAvailabilityZone: strptr("us-east-1a"), + SecurityGroups: []types.SecurityGroupMembership{ + { + SecurityGroupId: strptr("sg-12345"), + Status: strptr("active"), + }, + }, + CacheNodes: []types.CacheNode{ + { + CacheNodeId: strptr("0001"), + CacheNodeStatus: strptr("available"), + CacheNodeCreateTime: aws.Time(testTime), + CustomerAvailabilityZone: strptr("us-east-1a"), + Endpoint: &types.Endpoint{ + Address: strptr("my-cluster-001.abc123.0001.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + }, + { + CacheNodeId: strptr("0002"), + CacheNodeStatus: strptr("available"), + CacheNodeCreateTime: aws.Time(testTime), + CustomerAvailabilityZone: strptr("us-east-1b"), + Endpoint: &types.Endpoint{ + Address: strptr("my-cluster-001.abc123.0002.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + }, + }, + }, + tags: []types.Tag{ + {Key: strptr("Environment"), Value: strptr("production")}, + {Key: strptr("Application"), Value: strptr("web-app")}, + }, + expectedTargetCount: 2, + expectedLabels: []model.LabelSet{ + { + "__meta_elasticache_deployment_option": "node", + "__meta_elasticache_cache_cluster_arn": "arn:aws:elasticache:us-east-1:123456789012:cluster:my-cluster-001", + "__meta_elasticache_cache_cluster_cache_cluster_id": "my-cluster-001", + "__meta_elasticache_cache_cluster_cache_cluster_status": "available", + "__meta_elasticache_cache_cluster_engine": "redis", + "__meta_elasticache_cache_cluster_engine_version": "7.1", + "__meta_elasticache_cache_cluster_cache_node_type": "cache.t3.micro", + "__meta_elasticache_cache_cluster_num_cache_nodes": "2", + "__meta_elasticache_cache_cluster_cache_cluster_create_time": "2024-01-01T00:00:00Z", + "__meta_elasticache_cache_cluster_configuration_endpoint_address": "my-cluster.abc123.cfg.use1.cache.amazonaws.com", + "__meta_elasticache_cache_cluster_configuration_endpoint_port": "6379", + "__meta_elasticache_cache_cluster_at_rest_encryption_enabled": "true", + "__meta_elasticache_cache_cluster_transit_encryption_enabled": "true", + "__meta_elasticache_cache_cluster_auth_token_enabled": "true", + "__meta_elasticache_cache_cluster_auto_minor_version_upgrade": "true", + "__meta_elasticache_cache_cluster_cache_subnet_group_name": "my-subnet-group", + "__meta_elasticache_cache_cluster_preferred_availability_zone": "us-east-1a", + "__meta_elasticache_cache_cluster_security_group_membership_id_0": "sg-12345", + "__meta_elasticache_cache_cluster_security_group_membership_status_0": "active", + "__meta_elasticache_cache_cluster_tag_Environment": "production", + "__meta_elasticache_cache_cluster_tag_Application": "web-app", + "__meta_elasticache_cache_cluster_node_id": "0001", + "__meta_elasticache_cache_cluster_node_status": "available", + "__meta_elasticache_cache_cluster_node_create_time": "2024-01-01T00:00:00Z", + "__meta_elasticache_cache_cluster_node_availability_zone": "us-east-1a", + "__meta_elasticache_cache_cluster_node_endpoint_address": "my-cluster-001.abc123.0001.use1.cache.amazonaws.com", + "__meta_elasticache_cache_cluster_node_endpoint_port": "6379", + "__address__": "my-cluster-001.abc123.0001.use1.cache.amazonaws.com:6379", + }, + { + "__meta_elasticache_deployment_option": "node", + "__meta_elasticache_cache_cluster_arn": "arn:aws:elasticache:us-east-1:123456789012:cluster:my-cluster-001", + "__meta_elasticache_cache_cluster_cache_cluster_id": "my-cluster-001", + "__meta_elasticache_cache_cluster_cache_cluster_status": "available", + "__meta_elasticache_cache_cluster_engine": "redis", + "__meta_elasticache_cache_cluster_engine_version": "7.1", + "__meta_elasticache_cache_cluster_cache_node_type": "cache.t3.micro", + "__meta_elasticache_cache_cluster_num_cache_nodes": "2", + "__meta_elasticache_cache_cluster_cache_cluster_create_time": "2024-01-01T00:00:00Z", + "__meta_elasticache_cache_cluster_configuration_endpoint_address": "my-cluster.abc123.cfg.use1.cache.amazonaws.com", + "__meta_elasticache_cache_cluster_configuration_endpoint_port": "6379", + "__meta_elasticache_cache_cluster_at_rest_encryption_enabled": "true", + "__meta_elasticache_cache_cluster_transit_encryption_enabled": "true", + "__meta_elasticache_cache_cluster_auth_token_enabled": "true", + "__meta_elasticache_cache_cluster_auto_minor_version_upgrade": "true", + "__meta_elasticache_cache_cluster_cache_subnet_group_name": "my-subnet-group", + "__meta_elasticache_cache_cluster_preferred_availability_zone": "us-east-1a", + "__meta_elasticache_cache_cluster_security_group_membership_id_0": "sg-12345", + "__meta_elasticache_cache_cluster_security_group_membership_status_0": "active", + "__meta_elasticache_cache_cluster_tag_Environment": "production", + "__meta_elasticache_cache_cluster_tag_Application": "web-app", + "__meta_elasticache_cache_cluster_node_id": "0002", + "__meta_elasticache_cache_cluster_node_status": "available", + "__meta_elasticache_cache_cluster_node_create_time": "2024-01-01T00:00:00Z", + "__meta_elasticache_cache_cluster_node_availability_zone": "us-east-1b", + "__meta_elasticache_cache_cluster_node_endpoint_address": "my-cluster-001.abc123.0002.use1.cache.amazonaws.com", + "__meta_elasticache_cache_cluster_node_endpoint_port": "6379", + "__address__": "my-cluster-001.abc123.0002.use1.cache.amazonaws.com:6379", + }, + }, + }, + { + name: "CacheClusterWithSingleNode", + cluster: &types.CacheCluster{ + CacheClusterId: strptr("node-cluster-001"), + ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:cluster:node-cluster-001"), + CacheClusterStatus: strptr("available"), + Engine: strptr("redis"), + EngineVersion: strptr("6.2"), + CacheNodeType: strptr("cache.r6g.large"), + NumCacheNodes: aws.Int32(1), + CacheNodes: []types.CacheNode{ + { + CacheNodeId: strptr("0001"), + CacheNodeStatus: strptr("available"), + CacheNodeCreateTime: aws.Time(testTime), + CustomerAvailabilityZone: strptr("us-east-1a"), + Endpoint: &types.Endpoint{ + Address: strptr("node-cluster-001.abc123.0001.use1.cache.amazonaws.com"), + Port: aws.Int32(6379), + }, + }, + }, + }, + tags: []types.Tag{}, + expectedTargetCount: 1, + expectedLabels: []model.LabelSet{ + { + "__meta_elasticache_deployment_option": "node", + "__meta_elasticache_cache_cluster_arn": "arn:aws:elasticache:us-east-1:123456789012:cluster:node-cluster-001", + "__meta_elasticache_cache_cluster_cache_cluster_id": "node-cluster-001", + "__meta_elasticache_cache_cluster_cache_cluster_status": "available", + "__meta_elasticache_cache_cluster_engine": "redis", + "__meta_elasticache_cache_cluster_engine_version": "6.2", + "__meta_elasticache_cache_cluster_cache_node_type": "cache.r6g.large", + "__meta_elasticache_cache_cluster_num_cache_nodes": "1", + "__meta_elasticache_cache_cluster_node_id": "0001", + "__meta_elasticache_cache_cluster_node_status": "available", + "__meta_elasticache_cache_cluster_node_create_time": "2024-01-01T00:00:00Z", + "__meta_elasticache_cache_cluster_node_availability_zone": "us-east-1a", + "__meta_elasticache_cache_cluster_node_endpoint_address": "node-cluster-001.abc123.0001.use1.cache.amazonaws.com", + "__meta_elasticache_cache_cluster_node_endpoint_port": "6379", + "__address__": "node-cluster-001.abc123.0001.use1.cache.amazonaws.com:6379", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tg := &targetgroup.Group{ + Source: "test", + } + + addCacheClusterTargets(tg, tt.cluster, tt.tags) + + require.Len(t, tg.Targets, tt.expectedTargetCount) + + // Check each target + for i, expectedLabels := range tt.expectedLabels { + labels := tg.Targets[i] + + // Check that all expected labels are present with correct values + for k, v := range expectedLabels { + actualValue, exists := labels[k] + require.True(t, exists, "label %s should exist in target %d", k, i) + require.Equal(t, v, actualValue, "label %s mismatch in target %d", k, i) + } + } + }) + } +} + +// Mock Elasticache client. +type mockElasticacheClient struct { + data *elasticacheDataStore +} + +func newMockElasticacheClient(data *elasticacheDataStore) *mockElasticacheClient { + return &mockElasticacheClient{data: data} +} + +func (m *mockElasticacheClient) DescribeServerlessCaches(_ context.Context, input *elasticache.DescribeServerlessCachesInput, _ ...func(*elasticache.Options)) (*elasticache.DescribeServerlessCachesOutput, error) { + if input.ServerlessCacheName != nil { + // Filter by name + for _, cache := range m.data.serverlessCaches { + if cache.ServerlessCacheName != nil && *cache.ServerlessCacheName == *input.ServerlessCacheName { + return &elasticache.DescribeServerlessCachesOutput{ + ServerlessCaches: []types.ServerlessCache{cache}, + }, nil + } + } + return &elasticache.DescribeServerlessCachesOutput{ + ServerlessCaches: []types.ServerlessCache{}, + }, nil + } + + return &elasticache.DescribeServerlessCachesOutput{ + ServerlessCaches: m.data.serverlessCaches, + }, nil +} + +func (m *mockElasticacheClient) DescribeCacheClusters(_ context.Context, input *elasticache.DescribeCacheClustersInput, _ ...func(*elasticache.Options)) (*elasticache.DescribeCacheClustersOutput, error) { + if input.CacheClusterId != nil { + // Single cluster lookup + for _, cluster := range m.data.cacheClusters { + if cluster.CacheClusterId != nil && *cluster.CacheClusterId == *input.CacheClusterId { + return &elasticache.DescribeCacheClustersOutput{ + CacheClusters: []types.CacheCluster{cluster}, + }, nil + } + } + return &elasticache.DescribeCacheClustersOutput{ + CacheClusters: []types.CacheCluster{}, + }, nil + } + + return &elasticache.DescribeCacheClustersOutput{ + CacheClusters: m.data.cacheClusters, + }, nil +} + +func (m *mockElasticacheClient) ListTagsForResource(_ context.Context, input *elasticache.ListTagsForResourceInput, _ ...func(*elasticache.Options)) (*elasticache.ListTagsForResourceOutput, error) { + if input.ResourceName != nil { + if tags, ok := m.data.tags[*input.ResourceName]; ok { + return &elasticache.ListTagsForResourceOutput{ + TagList: tags, + }, nil + } + } + + return &elasticache.ListTagsForResourceOutput{ + TagList: []types.Tag{}, + }, nil +} + +func TestSplitCacheDeploymentOptions(t *testing.T) { + tests := []struct { + name string + caches []string + expectedServerlessCacheIDs []string + expectedCacheClusterIDs []string + }{ + { + name: "MixedARNs", + caches: []string{ + "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:my-serverless-cache", + "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:my-replication-group", + "arn:aws:elasticache:us-west-2:123456789012:serverlesscache:prod-cache", + }, + expectedServerlessCacheIDs: []string{"my-serverless-cache", "prod-cache"}, + expectedCacheClusterIDs: []string{"my-replication-group"}, + }, + { + name: "OnlyServerlessCaches", + caches: []string{ + "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:cache-1", + "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:cache-2", + }, + expectedServerlessCacheIDs: []string{"cache-1", "cache-2"}, + expectedCacheClusterIDs: nil, + }, + { + name: "OnlyReplicationGroups", + caches: []string{ + "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:cluster-1", + "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:cluster-2", + }, + expectedServerlessCacheIDs: nil, + expectedCacheClusterIDs: []string{"cluster-1", "cluster-2"}, + }, + { + name: "EmptyInput", + caches: []string{}, + expectedServerlessCacheIDs: nil, + expectedCacheClusterIDs: nil, + }, + { + name: "InvalidARNs", + caches: []string{ + "not-an-arn", + "arn:aws:elasticache:us-east-1", + "", + }, + expectedServerlessCacheIDs: nil, + expectedCacheClusterIDs: nil, + }, + { + name: "UnknownResourceType", + caches: []string{ + "arn:aws:elasticache:us-east-1:123456789012:unknown:resource-id", + }, + expectedServerlessCacheIDs: nil, + expectedCacheClusterIDs: nil, + }, + { + name: "MixedWithInvalidARNs", + caches: []string{ + "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:valid-cache", + "invalid-arn", + "arn:aws:elasticache:us-east-1:123456789012:replicationgroup:valid-cluster", + "", + "arn:aws:elasticache:us-east-1:123456789012:unknown:ignored", + }, + expectedServerlessCacheIDs: []string{"valid-cache"}, + expectedCacheClusterIDs: []string{"valid-cluster"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + serverlessCacheIDs, cacheClusterIDs := splitCacheDeploymentOptions(tt.caches) + + require.Equal(t, tt.expectedServerlessCacheIDs, serverlessCacheIDs, "serverless cache IDs mismatch") + require.Equal(t, tt.expectedCacheClusterIDs, cacheClusterIDs, "cache cluster IDs mismatch") + }) + } +} diff --git a/discovery/aws/metrics_elasticache.go b/discovery/aws/metrics_elasticache.go new file mode 100644 index 0000000000..7ecfcb4b72 --- /dev/null +++ b/discovery/aws/metrics_elasticache.go @@ -0,0 +1,32 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aws + +import ( + "github.com/prometheus/prometheus/discovery" +) + +type elasticacheMetrics struct { + refreshMetrics discovery.RefreshMetricsInstantiator +} + +var _ discovery.DiscovererMetrics = (*elasticacheMetrics)(nil) + +// Register implements discovery.DiscovererMetrics. +func (*elasticacheMetrics) Register() error { + return nil +} + +// Unregister implements discovery.DiscovererMetrics. +func (*elasticacheMetrics) Unregister() {} diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index 49b7774b5f..53fd6aaae5 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -1029,11 +1029,117 @@ The following meta labels are available on targets during [relabeling](#relabel_ * `__meta_msk_broker_node_exporter_enabled`: whether node exporter is enabled on brokers (broker nodes only) * `__meta_msk_controller_endpoint_index`: the index of the controller endpoint (controller nodes only) +#### `elasticache` + +The `elasticache` role discovers targets from AWS ElastiCache for both serverless caches and cache clusters. + +**Important**: For cache clusters, one target is created per cache node. Each target includes the cluster-level labels (ARN, status, tags, etc.) and node-specific labels (node ID, endpoint, availability zone, etc.). The `__address__` label is set to the individual node's endpoint address and port. + +For serverless caches, one target is created per serverless cache, with the `__address__` label set to the serverless cache endpoint. + +The IAM credentials used must have the following permissions to discover scrape targets: + +- `elasticache:DescribeServerlessCaches` +- `elasticache:DescribeCacheClusters` +- `elasticache:ListTagsForResource` + +The following meta labels are available on targets during [relabeling](#relabel_config): + +**Common labels (available on all targets):** + +* `__meta_elasticache_deployment_option`: the deployment option - either `serverless` for serverless caches or `node` for cache cluster nodes + +**Serverless Cache labels:** + +* `__meta_elasticache_serverless_cache_arn`: the ARN of the serverless cache +* `__meta_elasticache_serverless_cache_name`: the name of the serverless cache +* `__meta_elasticache_serverless_cache_status`: the status of the serverless cache +* `__meta_elasticache_serverless_cache_engine`: the cache engine (redis or valkey) +* `__meta_elasticache_serverless_cache_full_engine_version`: the full engine version +* `__meta_elasticache_serverless_cache_major_engine_version`: the major engine version +* `__meta_elasticache_serverless_cache_description`: the description of the serverless cache +* `__meta_elasticache_serverless_cache_create_time`: the creation time in RFC3339 format +* `__meta_elasticache_serverless_cache_snapshot_retention_limit`: the snapshot retention limit in days +* `__meta_elasticache_serverless_cache_daily_snapshot_time`: the daily snapshot time +* `__meta_elasticache_serverless_cache_user_group_id`: the user group ID +* `__meta_elasticache_serverless_cache_kms_key_id`: the KMS key ID for encryption at rest +* `__meta_elasticache_serverless_cache_endpoint_address`: the endpoint address +* `__meta_elasticache_serverless_cache_endpoint_port`: the endpoint port +* `__meta_elasticache_serverless_cache_reader_endpoint_address`: the reader endpoint address +* `__meta_elasticache_serverless_cache_reader_endpoint_port`: the reader endpoint port +* `__meta_elasticache_serverless_cache_security_group_id_`: security group IDs (indexed) +* `__meta_elasticache_serverless_cache_subnet_id_`: subnet IDs (indexed) +* `__meta_elasticache_serverless_cache_cache_usage_limit_data_storage_maximum`: maximum data storage in the specified unit +* `__meta_elasticache_serverless_cache_cache_usage_limit_data_storage_minimum`: minimum data storage in the specified unit +* `__meta_elasticache_serverless_cache_cache_usage_limit_data_storage_unit`: unit for data storage (e.g., GB) +* `__meta_elasticache_serverless_cache_cache_usage_limit_ecpu_per_second_maximum`: maximum ECPU per second +* `__meta_elasticache_serverless_cache_cache_usage_limit_ecpu_per_second_minimum`: minimum ECPU per second +* `__meta_elasticache_serverless_cache_tag_`: each serverless cache tag value, keyed by tag name + +**Cache Cluster labels:** + +* `__meta_elasticache_cache_cluster_arn`: the ARN of the cache cluster +* `__meta_elasticache_cache_cluster_cache_cluster_id`: the cache cluster ID +* `__meta_elasticache_cache_cluster_cache_cluster_status`: the status of the cache cluster +* `__meta_elasticache_cache_cluster_engine`: the cache engine (redis or memcached) +* `__meta_elasticache_cache_cluster_engine_version`: the engine version +* `__meta_elasticache_cache_cluster_cache_node_type`: the cache node type (e.g., cache.t3.micro) +* `__meta_elasticache_cache_cluster_num_cache_nodes`: the number of cache nodes +* `__meta_elasticache_cache_cluster_cache_cluster_create_time`: the creation time in RFC3339 format +* `__meta_elasticache_cache_cluster_at_rest_encryption_enabled`: whether encryption at rest is enabled +* `__meta_elasticache_cache_cluster_transit_encryption_enabled`: whether encryption in transit is enabled +* `__meta_elasticache_cache_cluster_transit_encryption_mode`: the transit encryption mode +* `__meta_elasticache_cache_cluster_auth_token_enabled`: whether auth token is enabled +* `__meta_elasticache_cache_cluster_auth_token_last_modified`: the last modification time of auth token +* `__meta_elasticache_cache_cluster_auto_minor_version_upgrade`: whether auto minor version upgrade is enabled +* `__meta_elasticache_cache_cluster_cache_parameter_group`: the cache parameter group name +* `__meta_elasticache_cache_cluster_cache_subnet_group_name`: the cache subnet group name +* `__meta_elasticache_cache_cluster_client_download_landing_page`: the client download landing page URL +* `__meta_elasticache_cache_cluster_ip_discovery`: the IP discovery mode (ipv4 or ipv6) +* `__meta_elasticache_cache_cluster_network_type`: the network type (ipv4, ipv6, or dual_stack) +* `__meta_elasticache_cache_cluster_preferred_availability_zone`: the preferred availability zone +* `__meta_elasticache_cache_cluster_preferred_maintenance_window`: the preferred maintenance window +* `__meta_elasticache_cache_cluster_preferred_outpost_arn`: the preferred outpost ARN +* `__meta_elasticache_cache_cluster_replication_group_id`: the replication group ID (for Redis clusters that are part of a replication group) +* `__meta_elasticache_cache_cluster_replication_group_log_delivery_enabled`: whether log delivery is enabled for the replication group +* `__meta_elasticache_cache_cluster_snapshot_retention_limit`: the snapshot retention limit in days +* `__meta_elasticache_cache_cluster_snapshot_window`: the daily snapshot window +* `__meta_elasticache_cache_cluster_configuration_endpoint_address`: the configuration endpoint address (cluster mode enabled only) +* `__meta_elasticache_cache_cluster_configuration_endpoint_port`: the configuration endpoint port (cluster mode enabled only) +* `__meta_elasticache_cache_cluster_notification_topic_arn`: the SNS topic ARN for notifications +* `__meta_elasticache_cache_cluster_notification_topic_status`: the SNS topic status +* `__meta_elasticache_cache_cluster_log_delivery_configuration_destination_type_`: log delivery destination type (cloudwatch-logs or kinesis-firehose) +* `__meta_elasticache_cache_cluster_log_delivery_configuration_log_format_`: log format (text or json) +* `__meta_elasticache_cache_cluster_log_delivery_configuration_log_type_`: log type (slow-log or engine-log) +* `__meta_elasticache_cache_cluster_log_delivery_configuration_status_`: log delivery status +* `__meta_elasticache_cache_cluster_log_delivery_configuration_message_`: log delivery message +* `__meta_elasticache_cache_cluster_log_delivery_configuration_log_group_`: CloudWatch log group name (cloudwatch-logs destination only) +* `__meta_elasticache_cache_cluster_log_delivery_configuration_delivery_stream_`: Kinesis Firehose delivery stream name (kinesis-firehose destination only) +* `__meta_elasticache_cache_cluster_pending_modified_values_auth_token_status`: pending auth token status +* `__meta_elasticache_cache_cluster_pending_modified_values_cache_node_type`: pending cache node type change +* `__meta_elasticache_cache_cluster_pending_modified_values_engine_version`: pending engine version upgrade +* `__meta_elasticache_cache_cluster_pending_modified_values_num_cache_nodes`: pending number of cache nodes +* `__meta_elasticache_cache_cluster_pending_modified_values_transit_encryption_enabled`: pending transit encryption status +* `__meta_elasticache_cache_cluster_pending_modified_values_transit_encryption_mode`: pending transit encryption mode +* `__meta_elasticache_cache_cluster_pending_modified_values_cache_node_ids_to_remove`: comma-separated list of cache node IDs to be removed +* `__meta_elasticache_cache_cluster_security_group_membership_id_`: security group ID (indexed) +* `__meta_elasticache_cache_cluster_security_group_membership_status_`: security group status (indexed) +* `__meta_elasticache_cache_cluster_node_id`: cache node ID +* `__meta_elasticache_cache_cluster_node_status`: cache node status +* `__meta_elasticache_cache_cluster_node_create_time`: cache node creation time in RFC3339 format +* `__meta_elasticache_cache_cluster_node_availability_zone`: cache node availability zone +* `__meta_elasticache_cache_cluster_node_customer_outpost_arn`: cache node outpost ARN +* `__meta_elasticache_cache_cluster_node_source_cache_node_id`: source cache node ID for replication +* `__meta_elasticache_cache_cluster_node_parameter_group_status`: parameter group status +* `__meta_elasticache_cache_cluster_node_endpoint_address`: cache node endpoint address +* `__meta_elasticache_cache_cluster_node_endpoint_port`: cache node endpoint port +* `__meta_elasticache_cache_cluster_tag_`: each cache cluster tag value, keyed by tag name + See below for the configuration options for AWS discovery: ```yaml # The AWS role to use for service discovery. -# Must be one of: ec2, lightsail, ecs, or msk. +# Must be one of: ec2, lightsail, ecs, msk, or elasticache. role: # The AWS region. If blank, the region from the instance metadata is used. @@ -1069,8 +1175,9 @@ filters: [ - name: values: , [...] ] -# List of ECS or MSK cluster ARNs (ecs and msk roles only) to discover. If empty, all clusters in the region are discovered. -# This can significantly improve performance when you only need to monitor specific clusters. +# List of ECS, ElastiCache, or MSK cluster identifiers (ecs, elasticache, and msk roles only) to discover. +# A List of ARNs of clusters to discover. If empty, all clusters in the region are discovered. +# This can significantly improve performance when you only need to monitor specific clusters/caches. [ clusters: [, ...] ] # HTTP client settings, including authentication methods (such as basic auth and diff --git a/go.mod b/go.mod index 36d46646e3..ee28796944 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.19.9 github.com/aws/aws-sdk-go-v2/service/ec2 v1.290.0 github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0 + github.com/aws/aws-sdk-go-v2/service/elasticache v1.51.9 github.com/aws/aws-sdk-go-v2/service/kafka v1.48.0 github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.11 github.com/aws/aws-sdk-go-v2/service/sts v1.41.6 diff --git a/go.sum b/go.sum index 51a62403c8..a73f648482 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/aws/aws-sdk-go-v2/service/ec2 v1.290.0 h1:Ub4CvLWf8wEQ7/pEiqXM9tTsHXf github.com/aws/aws-sdk-go-v2/service/ec2 v1.290.0/go.mod h1:Uy+C+Sc58jozdoL1McQr8bDsEvNFx+/nBY+vpO1HVUY= github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0 h1:hggRKpv26DpYMOik3wWo1Ty5MkANoXhNobjfWpC3G4M= github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0/go.mod h1:pMlGFDpHoLTJOIZHGdJOAWmi+xeIlQXuFTuQxs1epYE= +github.com/aws/aws-sdk-go-v2/service/elasticache v1.51.9 h1:hTgZLyNoDWphZUtTtcvQh0LP6TZO0mtdSfZK/GObDLk= +github.com/aws/aws-sdk-go-v2/service/elasticache v1.51.9/go.mod h1:91RkIYy9ubykxB50XGYDsbljLZnrZ6rp/Urt4rZrbwQ= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 h1:RuNSMoozM8oXlgLG/n6WLaFGoea7/CddrCfIiSA+xdY=