[FEATURE] AWS SD: Add Elasticache Role (#18099)

* AWS SD: Elasticache

This change adds Elasticache to the AWS SD.

Co-authored-by: Ben Kochie <superq@gmail.com>
Signed-off-by: Matt <small_minority@hotmail.com>

---------

Signed-off-by: Matt <small_minority@hotmail.com>
Co-authored-by: Ben Kochie <superq@gmail.com>
This commit is contained in:
Matt 2026-02-22 14:56:03 +00:00 committed by GitHub
parent a737448dd6
commit ce30ae49f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1713 additions and 12 deletions

View File

@ -186,6 +186,7 @@
"dockerswarm": true,
"ec2": true,
"ecs": true,
"elasticache": true,
"eureka": true,
"file": true,
"gce": true,

View File

@ -43,10 +43,11 @@ type Role string
// The valid options for Role.
const (
RoleEC2 Role = "ec2"
RoleECS Role = "ecs"
RoleLightsail Role = "lightsail"
RoleMSK Role = "msk"
RoleEC2 Role = "ec2"
RoleECS Role = "ecs"
RoleElasticache Role = "elasticache"
RoleLightsail Role = "lightsail"
RoleMSK Role = "msk"
)
// UnmarshalYAML implements the yaml.Unmarshaler interface.
@ -55,7 +56,7 @@ func (c *Role) UnmarshalYAML(unmarshal func(any) error) error {
return err
}
switch *c {
case RoleEC2, RoleECS, RoleLightsail, RoleMSK:
case RoleEC2, RoleECS, RoleElasticache, RoleLightsail, RoleMSK:
return nil
default:
return fmt.Errorf("unknown AWS SD role %q", *c)
@ -86,10 +87,11 @@ type SDConfig struct {
Clusters []string `yaml:"clusters,omitempty"`
// Embedded sub-configs (internal use only, not serialized)
*EC2SDConfig `yaml:"-"`
*ECSSDConfig `yaml:"-"`
*LightsailSDConfig `yaml:"-"`
*MSKSDConfig `yaml:"-"`
*EC2SDConfig `yaml:"-"`
*ECSSDConfig `yaml:"-"`
*ElasticacheSDConfig `yaml:"-"`
*LightsailSDConfig `yaml:"-"`
*MSKSDConfig `yaml:"-"`
}
// UnmarshalYAML implements the yaml.Unmarshaler interface for SDConfig.
@ -172,6 +174,37 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(any) error) error {
if c.Clusters != nil {
c.ECSSDConfig.Clusters = c.Clusters
}
case RoleElasticache:
if c.ElasticacheSDConfig == nil {
elasticacheConfig := DefaultElasticacheSDConfig
c.ElasticacheSDConfig = &elasticacheConfig
}
c.ElasticacheSDConfig.HTTPClientConfig = c.HTTPClientConfig
c.ElasticacheSDConfig.Region = c.Region
if c.Endpoint != "" {
c.ElasticacheSDConfig.Endpoint = c.Endpoint
}
if c.AccessKey != "" {
c.ElasticacheSDConfig.AccessKey = c.AccessKey
}
if c.SecretKey != "" {
c.ElasticacheSDConfig.SecretKey = c.SecretKey
}
if c.Profile != "" {
c.ElasticacheSDConfig.Profile = c.Profile
}
if c.RoleARN != "" {
c.ElasticacheSDConfig.RoleARN = c.RoleARN
}
if c.Port != 0 {
c.ElasticacheSDConfig.Port = c.Port
}
if c.RefreshInterval != 0 {
c.ElasticacheSDConfig.RefreshInterval = c.RefreshInterval
}
if c.Clusters != nil {
c.ElasticacheSDConfig.Clusters = c.Clusters
}
case RoleLightsail:
if c.LightsailSDConfig == nil {
lightsailConfig := DefaultLightsailSDConfig
@ -259,6 +292,9 @@ func (c *SDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Di
case RoleECS:
opts.Metrics = &ecsMetrics{refreshMetrics: awsMetrics.refreshMetrics}
return NewECSDiscovery(c.ECSSDConfig, opts)
case RoleElasticache:
opts.Metrics = &elasticacheMetrics{refreshMetrics: awsMetrics.refreshMetrics}
return NewElasticacheDiscovery(c.ElasticacheSDConfig, opts)
case RoleLightsail:
opts.Metrics = &lightsailMetrics{refreshMetrics: awsMetrics.refreshMetrics}
return NewLightsailDiscovery(c.LightsailSDConfig, opts)

View File

@ -0,0 +1,907 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aws
import (
"context"
"errors"
"fmt"
"log/slog"
"maps"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/elasticache"
"github.com/aws/aws-sdk-go-v2/service/elasticache/types"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil"
)
const (
elasticacheLabel = model.MetaLabelPrefix + "elasticache_"
elasticacheLabelDeploymentOption = elasticacheLabel + "deployment_option"
// cache cluster.
elasticacheLabelCacheCluster = elasticacheLabel + "cache_cluster_"
elasticacheLabelCacheClusterARN = elasticacheLabelCacheCluster + "arn"
elasticacheLabelCacheClusterAtRestEncryptionEnabled = elasticacheLabelCacheCluster + "at_rest_encryption_enabled"
elasticacheLabelCacheClusterAuthTokenEnabled = elasticacheLabelCacheCluster + "auth_token_enabled"
elasticacheLabelCacheClusterAuthTokenLastModified = elasticacheLabelCacheCluster + "auth_token_last_modified"
elasticacheLabelCacheClusterAutoMinorVersionUpgrade = elasticacheLabelCacheCluster + "auto_minor_version_upgrade"
elasticacheLabelCacheClusterCreateTime = elasticacheLabelCacheCluster + "cache_cluster_create_time"
elasticacheLabelCacheClusterID = elasticacheLabelCacheCluster + "cache_cluster_id"
elasticacheLabelCacheClusterStatus = elasticacheLabelCacheCluster + "cache_cluster_status"
elasticacheLabelCacheClusterNodeType = elasticacheLabelCacheCluster + "cache_node_type"
elasticacheLabelCacheClusterParameterGroup = elasticacheLabelCacheCluster + "cache_parameter_group"
elasticacheLabelCacheClusterSubnetGroupName = elasticacheLabelCacheCluster + "cache_subnet_group_name"
elasticacheLabelCacheClusterClientDownloadLandingPage = elasticacheLabelCacheCluster + "client_download_landing_page"
elasticacheLabelCacheClusterEngine = elasticacheLabelCacheCluster + "engine"
elasticacheLabelCacheClusterEngineVersion = elasticacheLabelCacheCluster + "engine_version"
elasticacheLabelCacheClusterIPDiscovery = elasticacheLabelCacheCluster + "ip_discovery"
elasticacheLabelCacheClusterNetworkType = elasticacheLabelCacheCluster + "network_type"
elasticacheLabelCacheClusterNumCacheNodes = elasticacheLabelCacheCluster + "num_cache_nodes"
elasticacheLabelCacheClusterPreferredAvailabilityZone = elasticacheLabelCacheCluster + "preferred_availability_zone"
elasticacheLabelCacheClusterPreferredMaintenanceWindow = elasticacheLabelCacheCluster + "preferred_maintenance_window"
elasticacheLabelCacheClusterPreferredOutpostARN = elasticacheLabelCacheCluster + "preferred_outpost_arn"
elasticacheLabelCacheClusterReplicationGroupID = elasticacheLabelCacheCluster + "replication_group_id"
elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled = elasticacheLabelCacheCluster + "replication_group_log_delivery_enabled"
elasticacheLabelCacheClusterSnapshotRetentionLimit = elasticacheLabelCacheCluster + "snapshot_retention_limit"
elasticacheLabelCacheClusterSnapshotWindow = elasticacheLabelCacheCluster + "snapshot_window"
elasticacheLabelCacheClusterTransitEncryptionEnabled = elasticacheLabelCacheCluster + "transit_encryption_enabled"
elasticacheLabelCacheClusterTransitEncryptionMode = elasticacheLabelCacheCluster + "transit_encryption_mode"
// configuration endpoint.
elasticacheLabelCacheClusterConfigurationEndpoint = elasticacheLabelCacheCluster + "configuration_endpoint_"
elasticacheLabelCacheClusterConfigurationEndpointAddress = elasticacheLabelCacheClusterConfigurationEndpoint + "address"
elasticacheLabelCacheClusterConfigurationEndpointPort = elasticacheLabelCacheClusterConfigurationEndpoint + "port"
// notification.
elasticacheLabelCacheClusterNotification = elasticacheLabelCacheCluster + "notification_"
elasticacheLabelCacheClusterNotificationTopicARN = elasticacheLabelCacheClusterNotification + "topic_arn"
elasticacheLabelCacheClusterNotificationTopicStatus = elasticacheLabelCacheClusterNotification + "topic_status"
// log delivery configuration (slice - use with index).
elasticacheLabelCacheClusterLogDeliveryConfiguration = elasticacheLabelCacheCluster + "log_delivery_configuration_"
elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "destination_type"
elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_format"
elasticacheLabelCacheClusterLogDeliveryConfigurationLogType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_type"
elasticacheLabelCacheClusterLogDeliveryConfigurationStatus = elasticacheLabelCacheClusterLogDeliveryConfiguration + "status"
elasticacheLabelCacheClusterLogDeliveryConfigurationMessage = elasticacheLabelCacheClusterLogDeliveryConfiguration + "message"
elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_group"
elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream = elasticacheLabelCacheClusterLogDeliveryConfiguration + "delivery_stream"
// pending modified values.
elasticacheLabelCacheClusterPendingModifiedValues = elasticacheLabelCacheCluster + "pending_modified_values_"
elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus = elasticacheLabelCacheClusterPendingModifiedValues + "auth_token_status"
elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_type"
elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion = elasticacheLabelCacheClusterPendingModifiedValues + "engine_version"
elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes = elasticacheLabelCacheClusterPendingModifiedValues + "num_cache_nodes"
elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_enabled"
elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_mode"
elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_ids_to_remove"
// security group membership (slice - use with index).
elasticacheLabelCacheClusterSecurityGroupMembership = elasticacheLabelCacheCluster + "security_group_membership_"
elasticacheLabelCacheClusterSecurityGroupMembershipID = elasticacheLabelCacheClusterSecurityGroupMembership + "id"
elasticacheLabelCacheClusterSecurityGroupMembershipStatus = elasticacheLabelCacheClusterSecurityGroupMembership + "status"
// tags - create one label per tag key, with the format: elasticache_cache_cluster_tag_<tagkey>.
elasticacheLabelCacheClusterTag = elasticacheLabelCacheCluster + "tag_"
// node.
elasticacheLabelCacheClusterNode = elasticacheLabelCacheCluster + "node_"
elasticacheLabelCacheClusterNodeCreateTime = elasticacheLabelCacheClusterNode + "create_time"
elasticacheLabelCacheClusterNodeID = elasticacheLabelCacheClusterNode + "id"
elasticacheLabelCacheClusterNodeStatus = elasticacheLabelCacheClusterNode + "status"
elasticacheLabelCacheClusterNodeAZ = elasticacheLabelCacheClusterNode + "availability_zone"
elasticacheLabelCacheClusterNodeCustomerOutpostARN = elasticacheLabelCacheClusterNode + "customer_outpost_arn"
elasticacheLabelCacheClusterNodeSourceCacheNodeID = elasticacheLabelCacheClusterNode + "source_cache_node_id"
elasticacheLabelCacheClusterNodeParameterGroupStatus = elasticacheLabelCacheClusterNode + "parameter_group_status"
// endpoint.
elasticacheLabelCacheClusterNodeEndpoint = elasticacheLabelCacheClusterNode + "endpoint_"
elasticacheLabelCacheClusterNodeEndpointAddress = elasticacheLabelCacheClusterNodeEndpoint + "address"
elasticacheLabelCacheClusterNodeEndpointPort = elasticacheLabelCacheClusterNodeEndpoint + "port"
// serverless cache.
elasticacheLabelServerlessCache = elasticacheLabel + "serverless_cache_"
elasticacheLabelServerlessCacheARN = elasticacheLabelServerlessCache + "arn"
elasticacheLabelServerlessCacheName = elasticacheLabelServerlessCache + "name"
elasticacheLabelServerlessCacheCreateTime = elasticacheLabelServerlessCache + "create_time"
elasticacheLabelServerlessCacheDescription = elasticacheLabelServerlessCache + "description"
elasticacheLabelServerlessCacheEngine = elasticacheLabelServerlessCache + "engine"
elasticacheLabelServerlessCacheFullEngineVersion = elasticacheLabelServerlessCache + "full_engine_version"
elasticacheLabelServerlessCacheMajorEngineVersion = elasticacheLabelServerlessCache + "major_engine_version"
elasticacheLabelServerlessCacheStatus = elasticacheLabelServerlessCache + "status"
elasticacheLabelServerlessCacheKmsKeyID = elasticacheLabelServerlessCache + "kms_key_id"
elasticacheLabelServerlessCacheUserGroupID = elasticacheLabelServerlessCache + "user_group_id"
elasticacheLabelServerlessCacheDailySnapshotTime = elasticacheLabelServerlessCache + "daily_snapshot_time"
elasticacheLabelServerlessCacheSnapshotRetentionLimit = elasticacheLabelServerlessCache + "snapshot_retention_limit"
// endpoint.
elasticacheLabelServerlessCacheEndpoint = elasticacheLabelServerlessCache + "endpoint_"
elasticacheLabelServerlessCacheEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "address"
elasticacheLabelServerlessCacheEndpointPort = elasticacheLabelServerlessCacheEndpoint + "port"
elasticacheLabelServerlessCacheReaderEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "reader_address"
elasticacheLabelServerlessCacheReaderEndpointPort = elasticacheLabelServerlessCacheEndpoint + "reader_port"
// security group membership (slice - use with index).
elasticacheLabelServerlessCacheSecurityGroupID = elasticacheLabelServerlessCache + "security_group_id"
// Subnet group membership (slice - use with index).
elasticacheLabelServerlessCacheSubnetID = elasticacheLabelServerlessCache + "subnet_id"
// cache usage limits.
elasticacheLabelServerlessCacheCacheUsageLimit = elasticacheLabelServerlessCache + "cache_usage_limit_"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage = elasticacheLabelServerlessCacheCacheUsageLimit + "data_storage"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "maximum"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "minimum"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "unit"
elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond = elasticacheLabelServerlessCacheCacheUsageLimit + "ecpu_per_second"
elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "maximum"
elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "minimum"
// tags - create one label per tag key, with the format: elasticache_serverless_cache_tag_<tagkey>.
elasticacheLabelServerlessCacheTag = elasticacheLabelServerlessCache + "tag_"
)
// DefaultElasticacheSDConfig is the default Elasticache SD configuration.
var DefaultElasticacheSDConfig = ElasticacheSDConfig{
Port: 80,
RefreshInterval: model.Duration(60 * time.Second),
RequestConcurrency: 10,
HTTPClientConfig: config.DefaultHTTPClientConfig,
}
func init() {
discovery.RegisterConfig(&ElasticacheSDConfig{})
}
// ElasticacheSDConfig is the configuration for Elasticache based service discovery.
type ElasticacheSDConfig struct {
Region string `yaml:"region"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key,omitempty"`
SecretKey config.Secret `yaml:"secret_key,omitempty"`
Profile string `yaml:"profile,omitempty"`
RoleARN string `yaml:"role_arn,omitempty"`
Clusters []string `yaml:"clusters,omitempty"`
Port int `yaml:"port"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
// RequestConcurrency controls the maximum number of concurrent Elasticache API requests.
RequestConcurrency int `yaml:"request_concurrency,omitempty"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
}
// NewDiscovererMetrics implements discovery.Config.
func (*ElasticacheSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
return &elasticacheMetrics{
refreshMetrics: rmi,
}
}
// Name returns the name of the Elasticache Config.
func (*ElasticacheSDConfig) Name() string { return "elasticache" }
// NewDiscoverer returns a Discoverer for the Elasticache Config.
func (c *ElasticacheSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
return NewElasticacheDiscovery(c, opts)
}
// UnmarshalYAML implements the yaml.Unmarshaler interface for the Elasticache Config.
func (c *ElasticacheSDConfig) UnmarshalYAML(unmarshal func(any) error) error {
*c = DefaultElasticacheSDConfig
type plain ElasticacheSDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
c.Region, err = loadRegion(context.Background(), c.Region)
if err != nil {
return fmt.Errorf("could not determine AWS region: %w", err)
}
return c.HTTPClientConfig.Validate()
}
type elasticacheClient interface {
DescribeServerlessCaches(ctx context.Context, params *elasticache.DescribeServerlessCachesInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeServerlessCachesOutput, error)
DescribeCacheClusters(ctx context.Context, params *elasticache.DescribeCacheClustersInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeCacheClustersOutput, error)
ListTagsForResource(ctx context.Context, params *elasticache.ListTagsForResourceInput, optFns ...func(*elasticache.Options)) (*elasticache.ListTagsForResourceOutput, error)
}
// ElasticacheDiscovery periodically performs Elasticache-SD requests.
// It implements the Discoverer interface.
type ElasticacheDiscovery struct {
*refresh.Discovery
logger *slog.Logger
cfg *ElasticacheSDConfig
elasticacheClient elasticacheClient
}
// NewElasticacheDiscovery returns a new ElasticacheDiscovery which periodically refreshes its targets.
func NewElasticacheDiscovery(conf *ElasticacheSDConfig, opts discovery.DiscovererOptions) (*ElasticacheDiscovery, error) {
m, ok := opts.Metrics.(*elasticacheMetrics)
if !ok {
return nil, errors.New("invalid discovery metrics type")
}
if opts.Logger == nil {
opts.Logger = promslog.NewNopLogger()
}
d := &ElasticacheDiscovery{
logger: opts.Logger,
cfg: conf,
}
d.Discovery = refresh.NewDiscovery(
refresh.Options{
Logger: opts.Logger,
Mech: "elasticache",
Interval: time.Duration(d.cfg.RefreshInterval),
RefreshF: d.refresh,
MetricsInstantiator: m.refreshMetrics,
},
)
return d, nil
}
func (d *ElasticacheDiscovery) initElasticacheClient(ctx context.Context) error {
if d.elasticacheClient != nil {
return nil
}
if d.cfg.Region == "" {
return errors.New("region must be set for Elasticache service discovery")
}
// Build the HTTP client from the provided HTTPClientConfig.
client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "elasticache_sd")
if err != nil {
return err
}
// Build the AWS config with the provided region.
var configOptions []func(*awsConfig.LoadOptions) error
configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region))
configOptions = append(configOptions, awsConfig.WithHTTPClient(client))
// Only set static credentials if both access key and secret key are provided
// Otherwise, let AWS SDK use its default credential chain
if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" {
credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "")
configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider))
}
if d.cfg.Profile != "" {
configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile))
}
cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...)
if err != nil {
d.logger.Error("Failed to create AWS config", "error", err)
return fmt.Errorf("could not create aws config: %w", err)
}
// If the role ARN is set, assume the role to get credentials and set the credentials provider in the config.
if d.cfg.RoleARN != "" {
assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN)
cfg.Credentials = aws.NewCredentialsCache(assumeProvider)
}
d.elasticacheClient = elasticache.NewFromConfig(cfg, func(options *elasticache.Options) {
if d.cfg.Endpoint != "" {
options.BaseEndpoint = &d.cfg.Endpoint
}
options.HTTPClient = client
})
// Test credentials by making a simple API call
testCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = d.elasticacheClient.DescribeCacheClusters(testCtx, &elasticache.DescribeCacheClustersInput{})
if err != nil {
d.logger.Error("Failed to test Elasticache credentials", "error", err)
return fmt.Errorf("elasticache credential test failed: %w", err)
}
return nil
}
// describeServerlessCaches calls DescribeServerlessCaches API for the given cache IDs (or all caches if no IDs are provided) and returns the list of serverless caches.
func (d *ElasticacheDiscovery) describeServerlessCaches(ctx context.Context, caches []string) ([]types.ServerlessCache, error) {
mu := &sync.Mutex{}
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
var serverlessCaches []types.ServerlessCache
if len(caches) == 0 {
errg.Go(func() error {
var nextToken *string
for {
output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{
MaxResults: aws.Int32(50),
NextToken: nextToken,
})
if err != nil {
return fmt.Errorf("failed to describe serverless caches: %w", err)
}
mu.Lock()
serverlessCaches = append(serverlessCaches, output.ServerlessCaches...)
mu.Unlock()
if output.NextToken == nil {
break
}
nextToken = output.NextToken
}
return nil
})
} else {
for _, cacheID := range caches {
errg.Go(func() error {
output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{
MaxResults: aws.Int32(50),
NextToken: nil,
ServerlessCacheName: aws.String(cacheID),
})
if err != nil {
return fmt.Errorf("failed to describe serverless cache %s: %w", cacheID, err)
}
mu.Lock()
serverlessCaches = append(serverlessCaches, output.ServerlessCaches...)
mu.Unlock()
return nil
})
}
}
return serverlessCaches, errg.Wait()
}
// describeCacheClusters calls DescribeCacheClusters API for the given cache cluster IDs (or all cache clusters if no IDs are provided) and returns the list of cache clusters.
func (d *ElasticacheDiscovery) describeCacheClusters(ctx context.Context, caches []string) ([]types.CacheCluster, error) {
mu := &sync.Mutex{}
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
showCacheClustersNotInReplicationGroupsBools := []bool{false, true}
var cacheClusters []types.CacheCluster
if len(caches) == 0 {
for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools {
errg.Go(func() error {
var nextToken *string
for {
output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{
MaxRecords: aws.Int32(100),
Marker: nextToken,
ShowCacheNodeInfo: aws.Bool(true),
ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool),
})
if err != nil {
return fmt.Errorf("failed to describe cache clusters: %w", err)
}
mu.Lock()
cacheClusters = append(cacheClusters, output.CacheClusters...)
mu.Unlock()
if output.Marker == nil {
break
}
nextToken = output.Marker
}
return nil
})
}
} else {
for _, cacheID := range caches {
for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools {
errg.Go(func() error {
output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{
MaxRecords: aws.Int32(100),
Marker: nil,
ShowCacheNodeInfo: aws.Bool(true),
ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool),
CacheClusterId: aws.String(cacheID),
})
if err != nil {
return fmt.Errorf("failed to describe cache cluster %s: %w", cacheID, err)
}
mu.Lock()
cacheClusters = append(cacheClusters, output.CacheClusters...)
mu.Unlock()
return nil
})
}
}
}
return cacheClusters, errg.Wait()
}
// listTagsForResource calls ListTagsForResource API for the given resource ARNs and returns a map of resource ARN to list of tags.
func (d *ElasticacheDiscovery) listTagsForResource(ctx context.Context, resourceARNs []string) (map[string][]types.Tag, error) {
mu := &sync.Mutex{}
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
tagsByResourceARN := make(map[string][]types.Tag)
for _, resourceARN := range resourceARNs {
errg.Go(func() error {
output, err := d.elasticacheClient.ListTagsForResource(ectx, &elasticache.ListTagsForResourceInput{
ResourceName: aws.String(resourceARN),
})
if err != nil {
return fmt.Errorf("failed to list tags for resource %s: %w", resourceARN, err)
}
mu.Lock()
tagsByResourceARN[resourceARN] = output.TagList
mu.Unlock()
return nil
})
}
return tagsByResourceARN, errg.Wait()
}
func (d *ElasticacheDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
err := d.initElasticacheClient(ctx)
if err != nil {
return nil, err
}
var clusters []string
clustersMu := sync.Mutex{}
serverlessCacheIDs, cacheClusterIDs := splitCacheDeploymentOptions(d.cfg.Clusters)
clusterErrg, clusterCtx := errgroup.WithContext(ctx)
clusterErrg.Go(func() error {
caches, err := d.describeServerlessCaches(clusterCtx, serverlessCacheIDs)
if err != nil {
return fmt.Errorf("failed to describe serverless caches: %w", err)
}
for _, cache := range caches {
clustersMu.Lock()
clusters = append(clusters, *cache.ARN)
clustersMu.Unlock()
}
return nil
})
clusterErrg.Go(func() error {
cacheClusters, err := d.describeCacheClusters(clusterCtx, cacheClusterIDs)
if err != nil {
return fmt.Errorf("failed to describe cache clusters: %w", err)
}
for _, cluster := range cacheClusters {
clustersMu.Lock()
clusters = append(clusters, *cluster.ARN)
clustersMu.Unlock()
}
return nil
})
if err := clusterErrg.Wait(); err != nil {
return nil, err
}
tagsByResourceARN, err := d.listTagsForResource(ctx, clusters)
if err != nil {
return nil, fmt.Errorf("failed to list tags for resources: %w", err)
}
tg := &targetgroup.Group{
Source: d.cfg.Region,
}
errg, ectx := errgroup.WithContext(ctx)
errg.Go(func() error {
caches, err := d.describeServerlessCaches(ectx, serverlessCacheIDs)
if err != nil {
return fmt.Errorf("failed to describe serverless caches: %w", err)
}
for _, cache := range caches {
addServerlessCacheTargets(tg, &cache, tagsByResourceARN[*cache.ARN])
}
return nil
})
errg.Go(func() error {
cacheClusters, err := d.describeCacheClusters(ectx, cacheClusterIDs)
if err != nil {
return fmt.Errorf("failed to describe cache clusters: %w", err)
}
for _, cluster := range cacheClusters {
addCacheClusterTargets(tg, &cluster, tagsByResourceARN[*cluster.ARN])
}
return nil
})
if err := errg.Wait(); err != nil {
return nil, err
}
return []*targetgroup.Group{tg}, nil
}
// splitCacheTypes takes a list of cache ARNs and splits them into serverless cache IDs and cache cluster IDs based on their format.
// Serverless caches are in the format arn:aws:elasticache:<REGION>:<ACCOUNT_ID>:serverlesscache:<CACHE_NAME>
// Cache clusters are in the format arn:aws:elasticache:<REGION>:<ACCOUNT_ID>:replicationgroup:<CACHE_CLUSTER_ID>.
func splitCacheDeploymentOptions(caches []string) (serverlessCacheIDs, cacheClusterIDs []string) {
for _, cacheARN := range caches {
if len(cacheARN) == 0 {
continue
}
parts := strings.Split(cacheARN, ":")
if len(parts) < 6 {
continue
}
resourceType := parts[5]
resourceID := parts[6]
switch resourceType {
case "serverlesscache":
serverlessCacheIDs = append(serverlessCacheIDs, resourceID)
case "replicationgroup":
cacheClusterIDs = append(cacheClusterIDs, resourceID)
default:
continue
}
}
return serverlessCacheIDs, cacheClusterIDs
}
// addServerlessCacheTargets adds targets for a serverless cache to the target group.
func addServerlessCacheTargets(tg *targetgroup.Group, cache *types.ServerlessCache, tags []types.Tag) {
labels := model.LabelSet{
elasticacheLabelDeploymentOption: model.LabelValue("serverless"),
elasticacheLabelServerlessCacheARN: model.LabelValue(*cache.ARN),
elasticacheLabelServerlessCacheName: model.LabelValue(*cache.ServerlessCacheName),
elasticacheLabelServerlessCacheStatus: model.LabelValue(*cache.Status),
elasticacheLabelServerlessCacheEngine: model.LabelValue(*cache.Engine),
elasticacheLabelServerlessCacheFullEngineVersion: model.LabelValue(*cache.FullEngineVersion),
elasticacheLabelServerlessCacheMajorEngineVersion: model.LabelValue(*cache.MajorEngineVersion),
}
if cache.Description != nil {
labels[elasticacheLabelServerlessCacheDescription] = model.LabelValue(*cache.Description)
}
if cache.CreateTime != nil {
labels[elasticacheLabelServerlessCacheCreateTime] = model.LabelValue(cache.CreateTime.Format(time.RFC3339))
}
if cache.KmsKeyId != nil {
labels[elasticacheLabelServerlessCacheKmsKeyID] = model.LabelValue(*cache.KmsKeyId)
}
if cache.UserGroupId != nil {
labels[elasticacheLabelServerlessCacheUserGroupID] = model.LabelValue(*cache.UserGroupId)
}
if cache.DailySnapshotTime != nil {
labels[elasticacheLabelServerlessCacheDailySnapshotTime] = model.LabelValue(*cache.DailySnapshotTime)
}
if cache.SnapshotRetentionLimit != nil {
labels[elasticacheLabelServerlessCacheSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cache.SnapshotRetentionLimit)))
}
if cache.Endpoint != nil {
if cache.Endpoint.Address != nil {
labels[elasticacheLabelServerlessCacheEndpointAddress] = model.LabelValue(*cache.Endpoint.Address)
}
if cache.Endpoint.Port != nil {
labels[elasticacheLabelServerlessCacheEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.Endpoint.Port)))
}
}
if cache.ReaderEndpoint != nil {
if cache.ReaderEndpoint.Address != nil {
labels[elasticacheLabelServerlessCacheReaderEndpointAddress] = model.LabelValue(*cache.ReaderEndpoint.Address)
}
if cache.ReaderEndpoint.Port != nil {
labels[elasticacheLabelServerlessCacheReaderEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.ReaderEndpoint.Port)))
}
}
for i, sgID := range cache.SecurityGroupIds {
labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSecurityGroupID, i))] = model.LabelValue(sgID)
}
for i, subnetID := range cache.SubnetIds {
labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSubnetID, i))] = model.LabelValue(subnetID)
}
if cache.CacheUsageLimits != nil {
if cache.CacheUsageLimits.DataStorage != nil {
if cache.CacheUsageLimits.DataStorage.Maximum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Maximum)))
}
if cache.CacheUsageLimits.DataStorage.Minimum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Minimum)))
}
labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit] = model.LabelValue(cache.CacheUsageLimits.DataStorage.Unit)
}
if cache.CacheUsageLimits.ECPUPerSecond != nil {
if cache.CacheUsageLimits.ECPUPerSecond.Maximum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Maximum)))
}
if cache.CacheUsageLimits.ECPUPerSecond.Minimum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Minimum)))
}
}
}
for _, tag := range tags {
if tag.Key != nil && tag.Value != nil {
labels[model.LabelName(elasticacheLabelServerlessCacheTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value)
}
}
// Set the address label using the endpoint
if cache.Endpoint != nil && cache.Endpoint.Address != nil && cache.Endpoint.Port != nil {
labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*cache.Endpoint.Address, strconv.Itoa(int(*cache.Endpoint.Port))))
}
tg.Targets = append(tg.Targets, labels)
}
// addCacheClusterTargets adds targets for a cache cluster to the target group.
// Creates one target per cache node for individual scraping.
func addCacheClusterTargets(tg *targetgroup.Group, cluster *types.CacheCluster, tags []types.Tag) {
// Build common labels that apply to all nodes in this cluster
commonLabels := model.LabelSet{
elasticacheLabelDeploymentOption: model.LabelValue("node"),
elasticacheLabelCacheClusterARN: model.LabelValue(*cluster.ARN),
elasticacheLabelCacheClusterID: model.LabelValue(*cluster.CacheClusterId),
elasticacheLabelCacheClusterStatus: model.LabelValue(*cluster.CacheClusterStatus),
}
if cluster.AtRestEncryptionEnabled != nil {
commonLabels[elasticacheLabelCacheClusterAtRestEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AtRestEncryptionEnabled))
}
if cluster.AuthTokenEnabled != nil {
commonLabels[elasticacheLabelCacheClusterAuthTokenEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AuthTokenEnabled))
}
if cluster.AuthTokenLastModifiedDate != nil {
commonLabels[elasticacheLabelCacheClusterAuthTokenLastModified] = model.LabelValue(cluster.AuthTokenLastModifiedDate.Format(time.RFC3339))
}
if cluster.AutoMinorVersionUpgrade != nil {
commonLabels[elasticacheLabelCacheClusterAutoMinorVersionUpgrade] = model.LabelValue(strconv.FormatBool(*cluster.AutoMinorVersionUpgrade))
}
if cluster.CacheClusterCreateTime != nil {
commonLabels[elasticacheLabelCacheClusterCreateTime] = model.LabelValue(cluster.CacheClusterCreateTime.Format(time.RFC3339))
}
if cluster.CacheNodeType != nil {
commonLabels[elasticacheLabelCacheClusterNodeType] = model.LabelValue(*cluster.CacheNodeType)
}
if cluster.CacheParameterGroup != nil && cluster.CacheParameterGroup.CacheParameterGroupName != nil {
commonLabels[elasticacheLabelCacheClusterParameterGroup] = model.LabelValue(*cluster.CacheParameterGroup.CacheParameterGroupName)
}
if cluster.CacheSubnetGroupName != nil {
commonLabels[elasticacheLabelCacheClusterSubnetGroupName] = model.LabelValue(*cluster.CacheSubnetGroupName)
}
if cluster.ClientDownloadLandingPage != nil {
commonLabels[elasticacheLabelCacheClusterClientDownloadLandingPage] = model.LabelValue(*cluster.ClientDownloadLandingPage)
}
if cluster.ConfigurationEndpoint != nil {
if cluster.ConfigurationEndpoint.Address != nil {
commonLabels[elasticacheLabelCacheClusterConfigurationEndpointAddress] = model.LabelValue(*cluster.ConfigurationEndpoint.Address)
}
if cluster.ConfigurationEndpoint.Port != nil {
commonLabels[elasticacheLabelCacheClusterConfigurationEndpointPort] = model.LabelValue(strconv.Itoa(int(*cluster.ConfigurationEndpoint.Port)))
}
}
if cluster.Engine != nil {
commonLabels[elasticacheLabelCacheClusterEngine] = model.LabelValue(*cluster.Engine)
}
if cluster.EngineVersion != nil {
commonLabels[elasticacheLabelCacheClusterEngineVersion] = model.LabelValue(*cluster.EngineVersion)
}
if len(cluster.IpDiscovery) > 0 {
commonLabels[elasticacheLabelCacheClusterIPDiscovery] = model.LabelValue(cluster.IpDiscovery)
}
if len(cluster.NetworkType) > 0 {
commonLabels[elasticacheLabelCacheClusterNetworkType] = model.LabelValue(cluster.NetworkType)
}
if cluster.NotificationConfiguration != nil {
if cluster.NotificationConfiguration.TopicArn != nil {
commonLabels[elasticacheLabelCacheClusterNotificationTopicARN] = model.LabelValue(*cluster.NotificationConfiguration.TopicArn)
}
if cluster.NotificationConfiguration.TopicStatus != nil {
commonLabels[elasticacheLabelCacheClusterNotificationTopicStatus] = model.LabelValue(*cluster.NotificationConfiguration.TopicStatus)
}
}
if cluster.NumCacheNodes != nil {
commonLabels[elasticacheLabelCacheClusterNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.NumCacheNodes)))
}
if cluster.PreferredAvailabilityZone != nil {
commonLabels[elasticacheLabelCacheClusterPreferredAvailabilityZone] = model.LabelValue(*cluster.PreferredAvailabilityZone)
}
if cluster.PreferredMaintenanceWindow != nil {
commonLabels[elasticacheLabelCacheClusterPreferredMaintenanceWindow] = model.LabelValue(*cluster.PreferredMaintenanceWindow)
}
if cluster.PreferredOutpostArn != nil {
commonLabels[elasticacheLabelCacheClusterPreferredOutpostARN] = model.LabelValue(*cluster.PreferredOutpostArn)
}
if cluster.ReplicationGroupId != nil {
commonLabels[elasticacheLabelCacheClusterReplicationGroupID] = model.LabelValue(*cluster.ReplicationGroupId)
}
if cluster.ReplicationGroupLogDeliveryEnabled != nil {
commonLabels[elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled] = model.LabelValue(strconv.FormatBool(*cluster.ReplicationGroupLogDeliveryEnabled))
}
if cluster.SnapshotRetentionLimit != nil {
commonLabels[elasticacheLabelCacheClusterSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cluster.SnapshotRetentionLimit)))
}
if cluster.SnapshotWindow != nil {
commonLabels[elasticacheLabelCacheClusterSnapshotWindow] = model.LabelValue(*cluster.SnapshotWindow)
}
if cluster.TransitEncryptionEnabled != nil {
commonLabels[elasticacheLabelCacheClusterTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.TransitEncryptionEnabled))
}
if len(cluster.TransitEncryptionMode) > 0 {
commonLabels[elasticacheLabelCacheClusterTransitEncryptionMode] = model.LabelValue(cluster.TransitEncryptionMode)
}
// Log delivery configurations (slice)
for i, logDelivery := range cluster.LogDeliveryConfigurations {
if len(logDelivery.DestinationType) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType, i))] = model.LabelValue(logDelivery.DestinationType)
}
if len(logDelivery.LogFormat) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat, i))] = model.LabelValue(logDelivery.LogFormat)
}
if len(logDelivery.LogType) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogType, i))] = model.LabelValue(logDelivery.LogType)
}
if len(logDelivery.Status) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationStatus, i))] = model.LabelValue(logDelivery.Status)
}
if logDelivery.Message != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationMessage, i))] = model.LabelValue(*logDelivery.Message)
}
if logDelivery.DestinationDetails != nil {
if logDelivery.DestinationDetails.CloudWatchLogsDetails != nil && logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup, i))] = model.LabelValue(*logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup)
}
if logDelivery.DestinationDetails.KinesisFirehoseDetails != nil && logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream, i))] = model.LabelValue(*logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream)
}
}
}
// Pending modified values
if cluster.PendingModifiedValues != nil {
if len(cluster.PendingModifiedValues.AuthTokenStatus) > 0 {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus] = model.LabelValue(cluster.PendingModifiedValues.AuthTokenStatus)
}
if cluster.PendingModifiedValues.CacheNodeType != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType] = model.LabelValue(*cluster.PendingModifiedValues.CacheNodeType)
}
if cluster.PendingModifiedValues.EngineVersion != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion] = model.LabelValue(*cluster.PendingModifiedValues.EngineVersion)
}
if cluster.PendingModifiedValues.NumCacheNodes != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.PendingModifiedValues.NumCacheNodes)))
}
if cluster.PendingModifiedValues.TransitEncryptionEnabled != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.PendingModifiedValues.TransitEncryptionEnabled))
}
if len(cluster.PendingModifiedValues.TransitEncryptionMode) > 0 {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode] = model.LabelValue(cluster.PendingModifiedValues.TransitEncryptionMode)
}
if len(cluster.PendingModifiedValues.CacheNodeIdsToRemove) > 0 {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove] = model.LabelValue(strings.Join(cluster.PendingModifiedValues.CacheNodeIdsToRemove, ","))
}
}
// Security group membership (slice)
for i, sg := range cluster.SecurityGroups {
if sg.SecurityGroupId != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipID, i))] = model.LabelValue(*sg.SecurityGroupId)
}
if sg.Status != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipStatus, i))] = model.LabelValue(*sg.Status)
}
}
// Tags
for _, tag := range tags {
if tag.Key != nil && tag.Value != nil {
commonLabels[model.LabelName(elasticacheLabelCacheClusterTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value)
}
}
// Create one target per cache node
for _, node := range cluster.CacheNodes {
// Clone common labels for this node
labels := make(model.LabelSet, len(commonLabels))
maps.Copy(labels, commonLabels)
// Add node-specific labels
if node.CacheNodeId != nil {
labels[elasticacheLabelCacheClusterNodeID] = model.LabelValue(*node.CacheNodeId)
}
if node.CacheNodeStatus != nil {
labels[elasticacheLabelCacheClusterNodeStatus] = model.LabelValue(*node.CacheNodeStatus)
}
if node.CacheNodeCreateTime != nil {
labels[elasticacheLabelCacheClusterNodeCreateTime] = model.LabelValue(node.CacheNodeCreateTime.Format(time.RFC3339))
}
if node.CustomerAvailabilityZone != nil {
labels[elasticacheLabelCacheClusterNodeAZ] = model.LabelValue(*node.CustomerAvailabilityZone)
}
if node.CustomerOutpostArn != nil {
labels[elasticacheLabelCacheClusterNodeCustomerOutpostARN] = model.LabelValue(*node.CustomerOutpostArn)
}
if node.SourceCacheNodeId != nil {
labels[elasticacheLabelCacheClusterNodeSourceCacheNodeID] = model.LabelValue(*node.SourceCacheNodeId)
}
if node.ParameterGroupStatus != nil {
labels[elasticacheLabelCacheClusterNodeParameterGroupStatus] = model.LabelValue(*node.ParameterGroupStatus)
}
if node.Endpoint != nil {
if node.Endpoint.Address != nil {
labels[elasticacheLabelCacheClusterNodeEndpointAddress] = model.LabelValue(*node.Endpoint.Address)
}
if node.Endpoint.Port != nil {
labels[elasticacheLabelCacheClusterNodeEndpointPort] = model.LabelValue(strconv.Itoa(int(*node.Endpoint.Port)))
}
// Set the address label to this node's endpoint
if node.Endpoint.Address != nil && node.Endpoint.Port != nil {
labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*node.Endpoint.Address, strconv.Itoa(int(*node.Endpoint.Port))))
}
}
tg.Targets = append(tg.Targets, labels)
}
}

View File

@ -0,0 +1,615 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aws
import (
"context"
"testing"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/elasticache"
"github.com/aws/aws-sdk-go-v2/service/elasticache/types"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
// Struct for test data.
type elasticacheDataStore struct {
region string
serverlessCaches []types.ServerlessCache
cacheClusters []types.CacheCluster
tags map[string][]types.Tag // keyed by cache ARN
}
func TestElasticacheDiscoveryDescribeServerlessCaches(t *testing.T) {
ctx := context.Background()
for _, tt := range []struct {
name string
ecData *elasticacheDataStore
cacheNames []string
expectedCount int
}{
{
name: "MultipleCaches",
ecData: &elasticacheDataStore{
region: "us-west-2",
serverlessCaches: []types.ServerlessCache{
{
ServerlessCacheName: strptr("test-cache"),
ARN: strptr("arn:aws:elasticache:us-west-2:123456789012:serverlesscache:test-cache"),
Status: strptr("available"),
Engine: strptr("redis"),
FullEngineVersion: strptr("7.1"),
CreateTime: aws.Time(time.Now()),
Endpoint: &types.Endpoint{
Address: strptr("test-cache.serverless.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
},
{
ServerlessCacheName: strptr("prod-cache"),
ARN: strptr("arn:aws:elasticache:us-west-2:123456789012:serverlesscache:prod-cache"),
Status: strptr("available"),
Engine: strptr("valkey"),
FullEngineVersion: strptr("7.2"),
CreateTime: aws.Time(time.Now()),
Endpoint: &types.Endpoint{
Address: strptr("prod-cache.serverless.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
},
},
},
cacheNames: []string{},
expectedCount: 2,
},
{
name: "SingleCache",
ecData: &elasticacheDataStore{
region: "us-east-1",
serverlessCaches: []types.ServerlessCache{
{
ServerlessCacheName: strptr("single-cache"),
ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:serverlesscache:single-cache"),
Status: strptr("available"),
Engine: strptr("redis"),
FullEngineVersion: strptr("7.1"),
CreateTime: aws.Time(time.Now()),
},
},
},
cacheNames: []string{"single-cache"},
expectedCount: 1,
},
{
name: "NoCaches",
ecData: &elasticacheDataStore{
region: "us-east-1",
serverlessCaches: []types.ServerlessCache{},
},
cacheNames: []string{},
expectedCount: 0,
},
} {
t.Run(tt.name, func(t *testing.T) {
client := newMockElasticacheClient(tt.ecData)
d := &ElasticacheDiscovery{
elasticacheClient: client,
cfg: &ElasticacheSDConfig{
Region: tt.ecData.region,
RequestConcurrency: 10,
},
}
caches, err := d.describeServerlessCaches(ctx, tt.cacheNames)
require.NoError(t, err)
require.Len(t, caches, tt.expectedCount)
})
}
}
func TestElasticacheDiscoveryDescribeCacheClusters(t *testing.T) {
ctx := context.Background()
for _, tt := range []struct {
name string
ecData *elasticacheDataStore
clusterIDs []string
expectedCount int
skipTest bool
}{
{
name: "MockValidation",
ecData: &elasticacheDataStore{
region: "us-west-2",
cacheClusters: []types.CacheCluster{
{
CacheClusterId: strptr("test-cluster-001"),
ARN: strptr("arn:aws:elasticache:us-west-2:123456789012:cluster:test-cluster-001"),
CacheClusterStatus: strptr("available"),
Engine: strptr("redis"),
EngineVersion: strptr("7.1"),
CacheNodeType: strptr("cache.t3.micro"),
NumCacheNodes: aws.Int32(1),
ConfigurationEndpoint: &types.Endpoint{
Address: strptr("test-cluster.abc123.cfg.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
},
},
},
clusterIDs: []string{},
expectedCount: 1,
skipTest: false,
},
{
name: "NoClusters",
ecData: &elasticacheDataStore{
region: "us-east-1",
cacheClusters: []types.CacheCluster{},
},
clusterIDs: []string{},
expectedCount: 0,
skipTest: false,
},
} {
t.Run(tt.name, func(t *testing.T) {
if tt.skipTest {
t.Skip("Skipping complex test with concurrency")
}
client := newMockElasticacheClient(tt.ecData)
// Verify mock returns expected data
output, err := client.DescribeCacheClusters(ctx, &elasticache.DescribeCacheClustersInput{})
require.NoError(t, err)
require.Len(t, output.CacheClusters, tt.expectedCount)
})
}
}
func TestAddServerlessCacheTargets(t *testing.T) {
testTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
tests := []struct {
name string
cache *types.ServerlessCache
tags []types.Tag
expectedLabels model.LabelSet
}{
{
name: "ServerlessCacheWithEndpoint",
cache: &types.ServerlessCache{
ServerlessCacheName: strptr("my-cache"),
ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:serverlesscache:my-cache"),
Status: strptr("available"),
Engine: strptr("redis"),
FullEngineVersion: strptr("7.1"),
MajorEngineVersion: strptr("7"),
CreateTime: aws.Time(testTime),
Endpoint: &types.Endpoint{
Address: strptr("my-cache.serverless.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
ReaderEndpoint: &types.Endpoint{
Address: strptr("my-cache-ro.serverless.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
SecurityGroupIds: []string{"sg-12345"},
SubnetIds: []string{"subnet-abcdef"},
CacheUsageLimits: &types.CacheUsageLimits{
DataStorage: &types.DataStorage{
Maximum: aws.Int32(10),
Minimum: aws.Int32(1),
Unit: types.DataStorageUnitGb,
},
ECPUPerSecond: &types.ECPUPerSecond{
Maximum: aws.Int32(5000),
Minimum: aws.Int32(1000),
},
},
},
tags: []types.Tag{
{Key: strptr("Environment"), Value: strptr("test")},
},
expectedLabels: model.LabelSet{
"__meta_elasticache_deployment_option": "serverless",
"__meta_elasticache_serverless_cache_arn": "arn:aws:elasticache:us-east-1:123456789012:serverlesscache:my-cache",
"__meta_elasticache_serverless_cache_name": "my-cache",
"__meta_elasticache_serverless_cache_status": "available",
"__meta_elasticache_serverless_cache_engine": "redis",
"__meta_elasticache_serverless_cache_full_engine_version": "7.1",
"__meta_elasticache_serverless_cache_major_engine_version": "7",
"__meta_elasticache_serverless_cache_create_time": "2024-01-01T00:00:00Z",
"__meta_elasticache_serverless_cache_endpoint_address": "my-cache.serverless.use1.cache.amazonaws.com",
"__meta_elasticache_serverless_cache_endpoint_port": "6379",
"__meta_elasticache_serverless_cache_security_group_id_0": "sg-12345",
"__meta_elasticache_serverless_cache_subnet_id_0": "subnet-abcdef",
"__address__": "my-cache.serverless.use1.cache.amazonaws.com:6379",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tg := &targetgroup.Group{
Source: "test",
}
addServerlessCacheTargets(tg, tt.cache, tt.tags)
require.Len(t, tg.Targets, 1)
labels := tg.Targets[0]
// Check that all expected labels are present with correct values
for k, v := range tt.expectedLabels {
actualValue, exists := labels[k]
require.True(t, exists, "label %s should exist", k)
require.Equal(t, v, actualValue, "label %s mismatch", k)
}
})
}
}
func TestAddCacheClusterTargets(t *testing.T) {
testTime := time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)
tests := []struct {
name string
cluster *types.CacheCluster
tags []types.Tag
expectedTargetCount int
expectedLabels []model.LabelSet // One per node
}{
{
name: "CacheClusterWithMultipleNodes",
cluster: &types.CacheCluster{
CacheClusterId: strptr("my-cluster-001"),
ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:cluster:my-cluster-001"),
CacheClusterStatus: strptr("available"),
Engine: strptr("redis"),
EngineVersion: strptr("7.1"),
CacheNodeType: strptr("cache.t3.micro"),
NumCacheNodes: aws.Int32(2),
CacheClusterCreateTime: aws.Time(testTime),
ConfigurationEndpoint: &types.Endpoint{
Address: strptr("my-cluster.abc123.cfg.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
AtRestEncryptionEnabled: aws.Bool(true),
TransitEncryptionEnabled: aws.Bool(true),
AuthTokenEnabled: aws.Bool(true),
AutoMinorVersionUpgrade: aws.Bool(true),
CacheSubnetGroupName: strptr("my-subnet-group"),
PreferredAvailabilityZone: strptr("us-east-1a"),
SecurityGroups: []types.SecurityGroupMembership{
{
SecurityGroupId: strptr("sg-12345"),
Status: strptr("active"),
},
},
CacheNodes: []types.CacheNode{
{
CacheNodeId: strptr("0001"),
CacheNodeStatus: strptr("available"),
CacheNodeCreateTime: aws.Time(testTime),
CustomerAvailabilityZone: strptr("us-east-1a"),
Endpoint: &types.Endpoint{
Address: strptr("my-cluster-001.abc123.0001.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
},
{
CacheNodeId: strptr("0002"),
CacheNodeStatus: strptr("available"),
CacheNodeCreateTime: aws.Time(testTime),
CustomerAvailabilityZone: strptr("us-east-1b"),
Endpoint: &types.Endpoint{
Address: strptr("my-cluster-001.abc123.0002.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
},
},
},
tags: []types.Tag{
{Key: strptr("Environment"), Value: strptr("production")},
{Key: strptr("Application"), Value: strptr("web-app")},
},
expectedTargetCount: 2,
expectedLabels: []model.LabelSet{
{
"__meta_elasticache_deployment_option": "node",
"__meta_elasticache_cache_cluster_arn": "arn:aws:elasticache:us-east-1:123456789012:cluster:my-cluster-001",
"__meta_elasticache_cache_cluster_cache_cluster_id": "my-cluster-001",
"__meta_elasticache_cache_cluster_cache_cluster_status": "available",
"__meta_elasticache_cache_cluster_engine": "redis",
"__meta_elasticache_cache_cluster_engine_version": "7.1",
"__meta_elasticache_cache_cluster_cache_node_type": "cache.t3.micro",
"__meta_elasticache_cache_cluster_num_cache_nodes": "2",
"__meta_elasticache_cache_cluster_cache_cluster_create_time": "2024-01-01T00:00:00Z",
"__meta_elasticache_cache_cluster_configuration_endpoint_address": "my-cluster.abc123.cfg.use1.cache.amazonaws.com",
"__meta_elasticache_cache_cluster_configuration_endpoint_port": "6379",
"__meta_elasticache_cache_cluster_at_rest_encryption_enabled": "true",
"__meta_elasticache_cache_cluster_transit_encryption_enabled": "true",
"__meta_elasticache_cache_cluster_auth_token_enabled": "true",
"__meta_elasticache_cache_cluster_auto_minor_version_upgrade": "true",
"__meta_elasticache_cache_cluster_cache_subnet_group_name": "my-subnet-group",
"__meta_elasticache_cache_cluster_preferred_availability_zone": "us-east-1a",
"__meta_elasticache_cache_cluster_security_group_membership_id_0": "sg-12345",
"__meta_elasticache_cache_cluster_security_group_membership_status_0": "active",
"__meta_elasticache_cache_cluster_tag_Environment": "production",
"__meta_elasticache_cache_cluster_tag_Application": "web-app",
"__meta_elasticache_cache_cluster_node_id": "0001",
"__meta_elasticache_cache_cluster_node_status": "available",
"__meta_elasticache_cache_cluster_node_create_time": "2024-01-01T00:00:00Z",
"__meta_elasticache_cache_cluster_node_availability_zone": "us-east-1a",
"__meta_elasticache_cache_cluster_node_endpoint_address": "my-cluster-001.abc123.0001.use1.cache.amazonaws.com",
"__meta_elasticache_cache_cluster_node_endpoint_port": "6379",
"__address__": "my-cluster-001.abc123.0001.use1.cache.amazonaws.com:6379",
},
{
"__meta_elasticache_deployment_option": "node",
"__meta_elasticache_cache_cluster_arn": "arn:aws:elasticache:us-east-1:123456789012:cluster:my-cluster-001",
"__meta_elasticache_cache_cluster_cache_cluster_id": "my-cluster-001",
"__meta_elasticache_cache_cluster_cache_cluster_status": "available",
"__meta_elasticache_cache_cluster_engine": "redis",
"__meta_elasticache_cache_cluster_engine_version": "7.1",
"__meta_elasticache_cache_cluster_cache_node_type": "cache.t3.micro",
"__meta_elasticache_cache_cluster_num_cache_nodes": "2",
"__meta_elasticache_cache_cluster_cache_cluster_create_time": "2024-01-01T00:00:00Z",
"__meta_elasticache_cache_cluster_configuration_endpoint_address": "my-cluster.abc123.cfg.use1.cache.amazonaws.com",
"__meta_elasticache_cache_cluster_configuration_endpoint_port": "6379",
"__meta_elasticache_cache_cluster_at_rest_encryption_enabled": "true",
"__meta_elasticache_cache_cluster_transit_encryption_enabled": "true",
"__meta_elasticache_cache_cluster_auth_token_enabled": "true",
"__meta_elasticache_cache_cluster_auto_minor_version_upgrade": "true",
"__meta_elasticache_cache_cluster_cache_subnet_group_name": "my-subnet-group",
"__meta_elasticache_cache_cluster_preferred_availability_zone": "us-east-1a",
"__meta_elasticache_cache_cluster_security_group_membership_id_0": "sg-12345",
"__meta_elasticache_cache_cluster_security_group_membership_status_0": "active",
"__meta_elasticache_cache_cluster_tag_Environment": "production",
"__meta_elasticache_cache_cluster_tag_Application": "web-app",
"__meta_elasticache_cache_cluster_node_id": "0002",
"__meta_elasticache_cache_cluster_node_status": "available",
"__meta_elasticache_cache_cluster_node_create_time": "2024-01-01T00:00:00Z",
"__meta_elasticache_cache_cluster_node_availability_zone": "us-east-1b",
"__meta_elasticache_cache_cluster_node_endpoint_address": "my-cluster-001.abc123.0002.use1.cache.amazonaws.com",
"__meta_elasticache_cache_cluster_node_endpoint_port": "6379",
"__address__": "my-cluster-001.abc123.0002.use1.cache.amazonaws.com:6379",
},
},
},
{
name: "CacheClusterWithSingleNode",
cluster: &types.CacheCluster{
CacheClusterId: strptr("node-cluster-001"),
ARN: strptr("arn:aws:elasticache:us-east-1:123456789012:cluster:node-cluster-001"),
CacheClusterStatus: strptr("available"),
Engine: strptr("redis"),
EngineVersion: strptr("6.2"),
CacheNodeType: strptr("cache.r6g.large"),
NumCacheNodes: aws.Int32(1),
CacheNodes: []types.CacheNode{
{
CacheNodeId: strptr("0001"),
CacheNodeStatus: strptr("available"),
CacheNodeCreateTime: aws.Time(testTime),
CustomerAvailabilityZone: strptr("us-east-1a"),
Endpoint: &types.Endpoint{
Address: strptr("node-cluster-001.abc123.0001.use1.cache.amazonaws.com"),
Port: aws.Int32(6379),
},
},
},
},
tags: []types.Tag{},
expectedTargetCount: 1,
expectedLabels: []model.LabelSet{
{
"__meta_elasticache_deployment_option": "node",
"__meta_elasticache_cache_cluster_arn": "arn:aws:elasticache:us-east-1:123456789012:cluster:node-cluster-001",
"__meta_elasticache_cache_cluster_cache_cluster_id": "node-cluster-001",
"__meta_elasticache_cache_cluster_cache_cluster_status": "available",
"__meta_elasticache_cache_cluster_engine": "redis",
"__meta_elasticache_cache_cluster_engine_version": "6.2",
"__meta_elasticache_cache_cluster_cache_node_type": "cache.r6g.large",
"__meta_elasticache_cache_cluster_num_cache_nodes": "1",
"__meta_elasticache_cache_cluster_node_id": "0001",
"__meta_elasticache_cache_cluster_node_status": "available",
"__meta_elasticache_cache_cluster_node_create_time": "2024-01-01T00:00:00Z",
"__meta_elasticache_cache_cluster_node_availability_zone": "us-east-1a",
"__meta_elasticache_cache_cluster_node_endpoint_address": "node-cluster-001.abc123.0001.use1.cache.amazonaws.com",
"__meta_elasticache_cache_cluster_node_endpoint_port": "6379",
"__address__": "node-cluster-001.abc123.0001.use1.cache.amazonaws.com:6379",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tg := &targetgroup.Group{
Source: "test",
}
addCacheClusterTargets(tg, tt.cluster, tt.tags)
require.Len(t, tg.Targets, tt.expectedTargetCount)
// Check each target
for i, expectedLabels := range tt.expectedLabels {
labels := tg.Targets[i]
// Check that all expected labels are present with correct values
for k, v := range expectedLabels {
actualValue, exists := labels[k]
require.True(t, exists, "label %s should exist in target %d", k, i)
require.Equal(t, v, actualValue, "label %s mismatch in target %d", k, i)
}
}
})
}
}
// Mock Elasticache client.
type mockElasticacheClient struct {
data *elasticacheDataStore
}
func newMockElasticacheClient(data *elasticacheDataStore) *mockElasticacheClient {
return &mockElasticacheClient{data: data}
}
func (m *mockElasticacheClient) DescribeServerlessCaches(_ context.Context, input *elasticache.DescribeServerlessCachesInput, _ ...func(*elasticache.Options)) (*elasticache.DescribeServerlessCachesOutput, error) {
if input.ServerlessCacheName != nil {
// Filter by name
for _, cache := range m.data.serverlessCaches {
if cache.ServerlessCacheName != nil && *cache.ServerlessCacheName == *input.ServerlessCacheName {
return &elasticache.DescribeServerlessCachesOutput{
ServerlessCaches: []types.ServerlessCache{cache},
}, nil
}
}
return &elasticache.DescribeServerlessCachesOutput{
ServerlessCaches: []types.ServerlessCache{},
}, nil
}
return &elasticache.DescribeServerlessCachesOutput{
ServerlessCaches: m.data.serverlessCaches,
}, nil
}
func (m *mockElasticacheClient) DescribeCacheClusters(_ context.Context, input *elasticache.DescribeCacheClustersInput, _ ...func(*elasticache.Options)) (*elasticache.DescribeCacheClustersOutput, error) {
if input.CacheClusterId != nil {
// Single cluster lookup
for _, cluster := range m.data.cacheClusters {
if cluster.CacheClusterId != nil && *cluster.CacheClusterId == *input.CacheClusterId {
return &elasticache.DescribeCacheClustersOutput{
CacheClusters: []types.CacheCluster{cluster},
}, nil
}
}
return &elasticache.DescribeCacheClustersOutput{
CacheClusters: []types.CacheCluster{},
}, nil
}
return &elasticache.DescribeCacheClustersOutput{
CacheClusters: m.data.cacheClusters,
}, nil
}
func (m *mockElasticacheClient) ListTagsForResource(_ context.Context, input *elasticache.ListTagsForResourceInput, _ ...func(*elasticache.Options)) (*elasticache.ListTagsForResourceOutput, error) {
if input.ResourceName != nil {
if tags, ok := m.data.tags[*input.ResourceName]; ok {
return &elasticache.ListTagsForResourceOutput{
TagList: tags,
}, nil
}
}
return &elasticache.ListTagsForResourceOutput{
TagList: []types.Tag{},
}, nil
}
func TestSplitCacheDeploymentOptions(t *testing.T) {
tests := []struct {
name string
caches []string
expectedServerlessCacheIDs []string
expectedCacheClusterIDs []string
}{
{
name: "MixedARNs",
caches: []string{
"arn:aws:elasticache:us-east-1:123456789012:serverlesscache:my-serverless-cache",
"arn:aws:elasticache:us-east-1:123456789012:replicationgroup:my-replication-group",
"arn:aws:elasticache:us-west-2:123456789012:serverlesscache:prod-cache",
},
expectedServerlessCacheIDs: []string{"my-serverless-cache", "prod-cache"},
expectedCacheClusterIDs: []string{"my-replication-group"},
},
{
name: "OnlyServerlessCaches",
caches: []string{
"arn:aws:elasticache:us-east-1:123456789012:serverlesscache:cache-1",
"arn:aws:elasticache:us-east-1:123456789012:serverlesscache:cache-2",
},
expectedServerlessCacheIDs: []string{"cache-1", "cache-2"},
expectedCacheClusterIDs: nil,
},
{
name: "OnlyReplicationGroups",
caches: []string{
"arn:aws:elasticache:us-east-1:123456789012:replicationgroup:cluster-1",
"arn:aws:elasticache:us-east-1:123456789012:replicationgroup:cluster-2",
},
expectedServerlessCacheIDs: nil,
expectedCacheClusterIDs: []string{"cluster-1", "cluster-2"},
},
{
name: "EmptyInput",
caches: []string{},
expectedServerlessCacheIDs: nil,
expectedCacheClusterIDs: nil,
},
{
name: "InvalidARNs",
caches: []string{
"not-an-arn",
"arn:aws:elasticache:us-east-1",
"",
},
expectedServerlessCacheIDs: nil,
expectedCacheClusterIDs: nil,
},
{
name: "UnknownResourceType",
caches: []string{
"arn:aws:elasticache:us-east-1:123456789012:unknown:resource-id",
},
expectedServerlessCacheIDs: nil,
expectedCacheClusterIDs: nil,
},
{
name: "MixedWithInvalidARNs",
caches: []string{
"arn:aws:elasticache:us-east-1:123456789012:serverlesscache:valid-cache",
"invalid-arn",
"arn:aws:elasticache:us-east-1:123456789012:replicationgroup:valid-cluster",
"",
"arn:aws:elasticache:us-east-1:123456789012:unknown:ignored",
},
expectedServerlessCacheIDs: []string{"valid-cache"},
expectedCacheClusterIDs: []string{"valid-cluster"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
serverlessCacheIDs, cacheClusterIDs := splitCacheDeploymentOptions(tt.caches)
require.Equal(t, tt.expectedServerlessCacheIDs, serverlessCacheIDs, "serverless cache IDs mismatch")
require.Equal(t, tt.expectedCacheClusterIDs, cacheClusterIDs, "cache cluster IDs mismatch")
})
}
}

View File

@ -0,0 +1,32 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aws
import (
"github.com/prometheus/prometheus/discovery"
)
type elasticacheMetrics struct {
refreshMetrics discovery.RefreshMetricsInstantiator
}
var _ discovery.DiscovererMetrics = (*elasticacheMetrics)(nil)
// Register implements discovery.DiscovererMetrics.
func (*elasticacheMetrics) Register() error {
return nil
}
// Unregister implements discovery.DiscovererMetrics.
func (*elasticacheMetrics) Unregister() {}

View File

@ -1029,11 +1029,117 @@ The following meta labels are available on targets during [relabeling](#relabel_
* `__meta_msk_broker_node_exporter_enabled`: whether node exporter is enabled on brokers (broker nodes only)
* `__meta_msk_controller_endpoint_index`: the index of the controller endpoint (controller nodes only)
#### `elasticache`
The `elasticache` role discovers targets from AWS ElastiCache for both serverless caches and cache clusters.
**Important**: For cache clusters, one target is created per cache node. Each target includes the cluster-level labels (ARN, status, tags, etc.) and node-specific labels (node ID, endpoint, availability zone, etc.). The `__address__` label is set to the individual node's endpoint address and port.
For serverless caches, one target is created per serverless cache, with the `__address__` label set to the serverless cache endpoint.
The IAM credentials used must have the following permissions to discover scrape targets:
- `elasticache:DescribeServerlessCaches`
- `elasticache:DescribeCacheClusters`
- `elasticache:ListTagsForResource`
The following meta labels are available on targets during [relabeling](#relabel_config):
**Common labels (available on all targets):**
* `__meta_elasticache_deployment_option`: the deployment option - either `serverless` for serverless caches or `node` for cache cluster nodes
**Serverless Cache labels:**
* `__meta_elasticache_serverless_cache_arn`: the ARN of the serverless cache
* `__meta_elasticache_serverless_cache_name`: the name of the serverless cache
* `__meta_elasticache_serverless_cache_status`: the status of the serverless cache
* `__meta_elasticache_serverless_cache_engine`: the cache engine (redis or valkey)
* `__meta_elasticache_serverless_cache_full_engine_version`: the full engine version
* `__meta_elasticache_serverless_cache_major_engine_version`: the major engine version
* `__meta_elasticache_serverless_cache_description`: the description of the serverless cache
* `__meta_elasticache_serverless_cache_create_time`: the creation time in RFC3339 format
* `__meta_elasticache_serverless_cache_snapshot_retention_limit`: the snapshot retention limit in days
* `__meta_elasticache_serverless_cache_daily_snapshot_time`: the daily snapshot time
* `__meta_elasticache_serverless_cache_user_group_id`: the user group ID
* `__meta_elasticache_serverless_cache_kms_key_id`: the KMS key ID for encryption at rest
* `__meta_elasticache_serverless_cache_endpoint_address`: the endpoint address
* `__meta_elasticache_serverless_cache_endpoint_port`: the endpoint port
* `__meta_elasticache_serverless_cache_reader_endpoint_address`: the reader endpoint address
* `__meta_elasticache_serverless_cache_reader_endpoint_port`: the reader endpoint port
* `__meta_elasticache_serverless_cache_security_group_id_<index>`: security group IDs (indexed)
* `__meta_elasticache_serverless_cache_subnet_id_<index>`: subnet IDs (indexed)
* `__meta_elasticache_serverless_cache_cache_usage_limit_data_storage_maximum`: maximum data storage in the specified unit
* `__meta_elasticache_serverless_cache_cache_usage_limit_data_storage_minimum`: minimum data storage in the specified unit
* `__meta_elasticache_serverless_cache_cache_usage_limit_data_storage_unit`: unit for data storage (e.g., GB)
* `__meta_elasticache_serverless_cache_cache_usage_limit_ecpu_per_second_maximum`: maximum ECPU per second
* `__meta_elasticache_serverless_cache_cache_usage_limit_ecpu_per_second_minimum`: minimum ECPU per second
* `__meta_elasticache_serverless_cache_tag_<tagkey>`: each serverless cache tag value, keyed by tag name
**Cache Cluster labels:**
* `__meta_elasticache_cache_cluster_arn`: the ARN of the cache cluster
* `__meta_elasticache_cache_cluster_cache_cluster_id`: the cache cluster ID
* `__meta_elasticache_cache_cluster_cache_cluster_status`: the status of the cache cluster
* `__meta_elasticache_cache_cluster_engine`: the cache engine (redis or memcached)
* `__meta_elasticache_cache_cluster_engine_version`: the engine version
* `__meta_elasticache_cache_cluster_cache_node_type`: the cache node type (e.g., cache.t3.micro)
* `__meta_elasticache_cache_cluster_num_cache_nodes`: the number of cache nodes
* `__meta_elasticache_cache_cluster_cache_cluster_create_time`: the creation time in RFC3339 format
* `__meta_elasticache_cache_cluster_at_rest_encryption_enabled`: whether encryption at rest is enabled
* `__meta_elasticache_cache_cluster_transit_encryption_enabled`: whether encryption in transit is enabled
* `__meta_elasticache_cache_cluster_transit_encryption_mode`: the transit encryption mode
* `__meta_elasticache_cache_cluster_auth_token_enabled`: whether auth token is enabled
* `__meta_elasticache_cache_cluster_auth_token_last_modified`: the last modification time of auth token
* `__meta_elasticache_cache_cluster_auto_minor_version_upgrade`: whether auto minor version upgrade is enabled
* `__meta_elasticache_cache_cluster_cache_parameter_group`: the cache parameter group name
* `__meta_elasticache_cache_cluster_cache_subnet_group_name`: the cache subnet group name
* `__meta_elasticache_cache_cluster_client_download_landing_page`: the client download landing page URL
* `__meta_elasticache_cache_cluster_ip_discovery`: the IP discovery mode (ipv4 or ipv6)
* `__meta_elasticache_cache_cluster_network_type`: the network type (ipv4, ipv6, or dual_stack)
* `__meta_elasticache_cache_cluster_preferred_availability_zone`: the preferred availability zone
* `__meta_elasticache_cache_cluster_preferred_maintenance_window`: the preferred maintenance window
* `__meta_elasticache_cache_cluster_preferred_outpost_arn`: the preferred outpost ARN
* `__meta_elasticache_cache_cluster_replication_group_id`: the replication group ID (for Redis clusters that are part of a replication group)
* `__meta_elasticache_cache_cluster_replication_group_log_delivery_enabled`: whether log delivery is enabled for the replication group
* `__meta_elasticache_cache_cluster_snapshot_retention_limit`: the snapshot retention limit in days
* `__meta_elasticache_cache_cluster_snapshot_window`: the daily snapshot window
* `__meta_elasticache_cache_cluster_configuration_endpoint_address`: the configuration endpoint address (cluster mode enabled only)
* `__meta_elasticache_cache_cluster_configuration_endpoint_port`: the configuration endpoint port (cluster mode enabled only)
* `__meta_elasticache_cache_cluster_notification_topic_arn`: the SNS topic ARN for notifications
* `__meta_elasticache_cache_cluster_notification_topic_status`: the SNS topic status
* `__meta_elasticache_cache_cluster_log_delivery_configuration_destination_type_<index>`: log delivery destination type (cloudwatch-logs or kinesis-firehose)
* `__meta_elasticache_cache_cluster_log_delivery_configuration_log_format_<index>`: log format (text or json)
* `__meta_elasticache_cache_cluster_log_delivery_configuration_log_type_<index>`: log type (slow-log or engine-log)
* `__meta_elasticache_cache_cluster_log_delivery_configuration_status_<index>`: log delivery status
* `__meta_elasticache_cache_cluster_log_delivery_configuration_message_<index>`: log delivery message
* `__meta_elasticache_cache_cluster_log_delivery_configuration_log_group_<index>`: CloudWatch log group name (cloudwatch-logs destination only)
* `__meta_elasticache_cache_cluster_log_delivery_configuration_delivery_stream_<index>`: Kinesis Firehose delivery stream name (kinesis-firehose destination only)
* `__meta_elasticache_cache_cluster_pending_modified_values_auth_token_status`: pending auth token status
* `__meta_elasticache_cache_cluster_pending_modified_values_cache_node_type`: pending cache node type change
* `__meta_elasticache_cache_cluster_pending_modified_values_engine_version`: pending engine version upgrade
* `__meta_elasticache_cache_cluster_pending_modified_values_num_cache_nodes`: pending number of cache nodes
* `__meta_elasticache_cache_cluster_pending_modified_values_transit_encryption_enabled`: pending transit encryption status
* `__meta_elasticache_cache_cluster_pending_modified_values_transit_encryption_mode`: pending transit encryption mode
* `__meta_elasticache_cache_cluster_pending_modified_values_cache_node_ids_to_remove`: comma-separated list of cache node IDs to be removed
* `__meta_elasticache_cache_cluster_security_group_membership_id_<index>`: security group ID (indexed)
* `__meta_elasticache_cache_cluster_security_group_membership_status_<index>`: security group status (indexed)
* `__meta_elasticache_cache_cluster_node_id`: cache node ID
* `__meta_elasticache_cache_cluster_node_status`: cache node status
* `__meta_elasticache_cache_cluster_node_create_time`: cache node creation time in RFC3339 format
* `__meta_elasticache_cache_cluster_node_availability_zone`: cache node availability zone
* `__meta_elasticache_cache_cluster_node_customer_outpost_arn`: cache node outpost ARN
* `__meta_elasticache_cache_cluster_node_source_cache_node_id`: source cache node ID for replication
* `__meta_elasticache_cache_cluster_node_parameter_group_status`: parameter group status
* `__meta_elasticache_cache_cluster_node_endpoint_address`: cache node endpoint address
* `__meta_elasticache_cache_cluster_node_endpoint_port`: cache node endpoint port
* `__meta_elasticache_cache_cluster_tag_<tagkey>`: each cache cluster tag value, keyed by tag name
See below for the configuration options for AWS discovery:
```yaml
# The AWS role to use for service discovery.
# Must be one of: ec2, lightsail, ecs, or msk.
# Must be one of: ec2, lightsail, ecs, msk, or elasticache.
role: <string>
# The AWS region. If blank, the region from the instance metadata is used.
@ -1069,8 +1175,9 @@ filters:
[ - name: <string>
values: <string>, [...] ]
# List of ECS or MSK cluster ARNs (ecs and msk roles only) to discover. If empty, all clusters in the region are discovered.
# This can significantly improve performance when you only need to monitor specific clusters.
# List of ECS, ElastiCache, or MSK cluster identifiers (ecs, elasticache, and msk roles only) to discover.
# A List of ARNs of clusters to discover. If empty, all clusters in the region are discovered.
# This can significantly improve performance when you only need to monitor specific clusters/caches.
[ clusters: [<string>, ...] ]
# HTTP client settings, including authentication methods (such as basic auth and

1
go.mod
View File

@ -16,6 +16,7 @@ require (
github.com/aws/aws-sdk-go-v2/credentials v1.19.9
github.com/aws/aws-sdk-go-v2/service/ec2 v1.290.0
github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0
github.com/aws/aws-sdk-go-v2/service/elasticache v1.51.9
github.com/aws/aws-sdk-go-v2/service/kafka v1.48.0
github.com/aws/aws-sdk-go-v2/service/lightsail v1.50.11
github.com/aws/aws-sdk-go-v2/service/sts v1.41.6

2
go.sum
View File

@ -67,6 +67,8 @@ github.com/aws/aws-sdk-go-v2/service/ec2 v1.290.0 h1:Ub4CvLWf8wEQ7/pEiqXM9tTsHXf
github.com/aws/aws-sdk-go-v2/service/ec2 v1.290.0/go.mod h1:Uy+C+Sc58jozdoL1McQr8bDsEvNFx+/nBY+vpO1HVUY=
github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0 h1:hggRKpv26DpYMOik3wWo1Ty5MkANoXhNobjfWpC3G4M=
github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0/go.mod h1:pMlGFDpHoLTJOIZHGdJOAWmi+xeIlQXuFTuQxs1epYE=
github.com/aws/aws-sdk-go-v2/service/elasticache v1.51.9 h1:hTgZLyNoDWphZUtTtcvQh0LP6TZO0mtdSfZK/GObDLk=
github.com/aws/aws-sdk-go-v2/service/elasticache v1.51.9/go.mod h1:91RkIYy9ubykxB50XGYDsbljLZnrZ6rp/Urt4rZrbwQ=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4 h1:0ryTNEdJbzUCEWkVXEXoqlXV72J5keC1GvILMOuD00E=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.4/go.mod h1:HQ4qwNZh32C3CBeO6iJLQlgtMzqeG17ziAA/3KDJFow=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 h1:RuNSMoozM8oXlgLG/n6WLaFGoea7/CddrCfIiSA+xdY=