prometheus/discovery/aws/elasticache.go
Matthieu MOREL 45b9329e68
chore: fix emptyStringTest issues from gocritic (#18226)
Signed-off-by: Matthieu MOREL <matthieu.morel35@gmail.com>
2026-03-04 08:24:50 +01:00

908 lines
42 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aws
import (
"context"
"errors"
"fmt"
"log/slog"
"maps"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/elasticache"
"github.com/aws/aws-sdk-go-v2/service/elasticache/types"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil"
)
const (
elasticacheLabel = model.MetaLabelPrefix + "elasticache_"
elasticacheLabelDeploymentOption = elasticacheLabel + "deployment_option"
// cache cluster.
elasticacheLabelCacheCluster = elasticacheLabel + "cache_cluster_"
elasticacheLabelCacheClusterARN = elasticacheLabelCacheCluster + "arn"
elasticacheLabelCacheClusterAtRestEncryptionEnabled = elasticacheLabelCacheCluster + "at_rest_encryption_enabled"
elasticacheLabelCacheClusterAuthTokenEnabled = elasticacheLabelCacheCluster + "auth_token_enabled"
elasticacheLabelCacheClusterAuthTokenLastModified = elasticacheLabelCacheCluster + "auth_token_last_modified"
elasticacheLabelCacheClusterAutoMinorVersionUpgrade = elasticacheLabelCacheCluster + "auto_minor_version_upgrade"
elasticacheLabelCacheClusterCreateTime = elasticacheLabelCacheCluster + "cache_cluster_create_time"
elasticacheLabelCacheClusterID = elasticacheLabelCacheCluster + "cache_cluster_id"
elasticacheLabelCacheClusterStatus = elasticacheLabelCacheCluster + "cache_cluster_status"
elasticacheLabelCacheClusterNodeType = elasticacheLabelCacheCluster + "cache_node_type"
elasticacheLabelCacheClusterParameterGroup = elasticacheLabelCacheCluster + "cache_parameter_group"
elasticacheLabelCacheClusterSubnetGroupName = elasticacheLabelCacheCluster + "cache_subnet_group_name"
elasticacheLabelCacheClusterClientDownloadLandingPage = elasticacheLabelCacheCluster + "client_download_landing_page"
elasticacheLabelCacheClusterEngine = elasticacheLabelCacheCluster + "engine"
elasticacheLabelCacheClusterEngineVersion = elasticacheLabelCacheCluster + "engine_version"
elasticacheLabelCacheClusterIPDiscovery = elasticacheLabelCacheCluster + "ip_discovery"
elasticacheLabelCacheClusterNetworkType = elasticacheLabelCacheCluster + "network_type"
elasticacheLabelCacheClusterNumCacheNodes = elasticacheLabelCacheCluster + "num_cache_nodes"
elasticacheLabelCacheClusterPreferredAvailabilityZone = elasticacheLabelCacheCluster + "preferred_availability_zone"
elasticacheLabelCacheClusterPreferredMaintenanceWindow = elasticacheLabelCacheCluster + "preferred_maintenance_window"
elasticacheLabelCacheClusterPreferredOutpostARN = elasticacheLabelCacheCluster + "preferred_outpost_arn"
elasticacheLabelCacheClusterReplicationGroupID = elasticacheLabelCacheCluster + "replication_group_id"
elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled = elasticacheLabelCacheCluster + "replication_group_log_delivery_enabled"
elasticacheLabelCacheClusterSnapshotRetentionLimit = elasticacheLabelCacheCluster + "snapshot_retention_limit"
elasticacheLabelCacheClusterSnapshotWindow = elasticacheLabelCacheCluster + "snapshot_window"
elasticacheLabelCacheClusterTransitEncryptionEnabled = elasticacheLabelCacheCluster + "transit_encryption_enabled"
elasticacheLabelCacheClusterTransitEncryptionMode = elasticacheLabelCacheCluster + "transit_encryption_mode"
// configuration endpoint.
elasticacheLabelCacheClusterConfigurationEndpoint = elasticacheLabelCacheCluster + "configuration_endpoint_"
elasticacheLabelCacheClusterConfigurationEndpointAddress = elasticacheLabelCacheClusterConfigurationEndpoint + "address"
elasticacheLabelCacheClusterConfigurationEndpointPort = elasticacheLabelCacheClusterConfigurationEndpoint + "port"
// notification.
elasticacheLabelCacheClusterNotification = elasticacheLabelCacheCluster + "notification_"
elasticacheLabelCacheClusterNotificationTopicARN = elasticacheLabelCacheClusterNotification + "topic_arn"
elasticacheLabelCacheClusterNotificationTopicStatus = elasticacheLabelCacheClusterNotification + "topic_status"
// log delivery configuration (slice - use with index).
elasticacheLabelCacheClusterLogDeliveryConfiguration = elasticacheLabelCacheCluster + "log_delivery_configuration_"
elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "destination_type"
elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_format"
elasticacheLabelCacheClusterLogDeliveryConfigurationLogType = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_type"
elasticacheLabelCacheClusterLogDeliveryConfigurationStatus = elasticacheLabelCacheClusterLogDeliveryConfiguration + "status"
elasticacheLabelCacheClusterLogDeliveryConfigurationMessage = elasticacheLabelCacheClusterLogDeliveryConfiguration + "message"
elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup = elasticacheLabelCacheClusterLogDeliveryConfiguration + "log_group"
elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream = elasticacheLabelCacheClusterLogDeliveryConfiguration + "delivery_stream"
// pending modified values.
elasticacheLabelCacheClusterPendingModifiedValues = elasticacheLabelCacheCluster + "pending_modified_values_"
elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus = elasticacheLabelCacheClusterPendingModifiedValues + "auth_token_status"
elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_type"
elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion = elasticacheLabelCacheClusterPendingModifiedValues + "engine_version"
elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes = elasticacheLabelCacheClusterPendingModifiedValues + "num_cache_nodes"
elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_enabled"
elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode = elasticacheLabelCacheClusterPendingModifiedValues + "transit_encryption_mode"
elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove = elasticacheLabelCacheClusterPendingModifiedValues + "cache_node_ids_to_remove"
// security group membership (slice - use with index).
elasticacheLabelCacheClusterSecurityGroupMembership = elasticacheLabelCacheCluster + "security_group_membership_"
elasticacheLabelCacheClusterSecurityGroupMembershipID = elasticacheLabelCacheClusterSecurityGroupMembership + "id"
elasticacheLabelCacheClusterSecurityGroupMembershipStatus = elasticacheLabelCacheClusterSecurityGroupMembership + "status"
// tags - create one label per tag key, with the format: elasticache_cache_cluster_tag_<tagkey>.
elasticacheLabelCacheClusterTag = elasticacheLabelCacheCluster + "tag_"
// node.
elasticacheLabelCacheClusterNode = elasticacheLabelCacheCluster + "node_"
elasticacheLabelCacheClusterNodeCreateTime = elasticacheLabelCacheClusterNode + "create_time"
elasticacheLabelCacheClusterNodeID = elasticacheLabelCacheClusterNode + "id"
elasticacheLabelCacheClusterNodeStatus = elasticacheLabelCacheClusterNode + "status"
elasticacheLabelCacheClusterNodeAZ = elasticacheLabelCacheClusterNode + "availability_zone"
elasticacheLabelCacheClusterNodeCustomerOutpostARN = elasticacheLabelCacheClusterNode + "customer_outpost_arn"
elasticacheLabelCacheClusterNodeSourceCacheNodeID = elasticacheLabelCacheClusterNode + "source_cache_node_id"
elasticacheLabelCacheClusterNodeParameterGroupStatus = elasticacheLabelCacheClusterNode + "parameter_group_status"
// endpoint.
elasticacheLabelCacheClusterNodeEndpoint = elasticacheLabelCacheClusterNode + "endpoint_"
elasticacheLabelCacheClusterNodeEndpointAddress = elasticacheLabelCacheClusterNodeEndpoint + "address"
elasticacheLabelCacheClusterNodeEndpointPort = elasticacheLabelCacheClusterNodeEndpoint + "port"
// serverless cache.
elasticacheLabelServerlessCache = elasticacheLabel + "serverless_cache_"
elasticacheLabelServerlessCacheARN = elasticacheLabelServerlessCache + "arn"
elasticacheLabelServerlessCacheName = elasticacheLabelServerlessCache + "name"
elasticacheLabelServerlessCacheCreateTime = elasticacheLabelServerlessCache + "create_time"
elasticacheLabelServerlessCacheDescription = elasticacheLabelServerlessCache + "description"
elasticacheLabelServerlessCacheEngine = elasticacheLabelServerlessCache + "engine"
elasticacheLabelServerlessCacheFullEngineVersion = elasticacheLabelServerlessCache + "full_engine_version"
elasticacheLabelServerlessCacheMajorEngineVersion = elasticacheLabelServerlessCache + "major_engine_version"
elasticacheLabelServerlessCacheStatus = elasticacheLabelServerlessCache + "status"
elasticacheLabelServerlessCacheKmsKeyID = elasticacheLabelServerlessCache + "kms_key_id"
elasticacheLabelServerlessCacheUserGroupID = elasticacheLabelServerlessCache + "user_group_id"
elasticacheLabelServerlessCacheDailySnapshotTime = elasticacheLabelServerlessCache + "daily_snapshot_time"
elasticacheLabelServerlessCacheSnapshotRetentionLimit = elasticacheLabelServerlessCache + "snapshot_retention_limit"
// endpoint.
elasticacheLabelServerlessCacheEndpoint = elasticacheLabelServerlessCache + "endpoint_"
elasticacheLabelServerlessCacheEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "address"
elasticacheLabelServerlessCacheEndpointPort = elasticacheLabelServerlessCacheEndpoint + "port"
elasticacheLabelServerlessCacheReaderEndpointAddress = elasticacheLabelServerlessCacheEndpoint + "reader_address"
elasticacheLabelServerlessCacheReaderEndpointPort = elasticacheLabelServerlessCacheEndpoint + "reader_port"
// security group membership (slice - use with index).
elasticacheLabelServerlessCacheSecurityGroupID = elasticacheLabelServerlessCache + "security_group_id"
// Subnet group membership (slice - use with index).
elasticacheLabelServerlessCacheSubnetID = elasticacheLabelServerlessCache + "subnet_id"
// cache usage limits.
elasticacheLabelServerlessCacheCacheUsageLimit = elasticacheLabelServerlessCache + "cache_usage_limit_"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage = elasticacheLabelServerlessCacheCacheUsageLimit + "data_storage"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "maximum"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "minimum"
elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit = elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorage + "unit"
elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond = elasticacheLabelServerlessCacheCacheUsageLimit + "ecpu_per_second"
elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "maximum"
elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum = elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecond + "minimum"
// tags - create one label per tag key, with the format: elasticache_serverless_cache_tag_<tagkey>.
elasticacheLabelServerlessCacheTag = elasticacheLabelServerlessCache + "tag_"
)
// DefaultElasticacheSDConfig is the default Elasticache SD configuration.
var DefaultElasticacheSDConfig = ElasticacheSDConfig{
Port: 80,
RefreshInterval: model.Duration(60 * time.Second),
RequestConcurrency: 10,
HTTPClientConfig: config.DefaultHTTPClientConfig,
}
func init() {
discovery.RegisterConfig(&ElasticacheSDConfig{})
}
// ElasticacheSDConfig is the configuration for Elasticache based service discovery.
type ElasticacheSDConfig struct {
Region string `yaml:"region"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key,omitempty"`
SecretKey config.Secret `yaml:"secret_key,omitempty"`
Profile string `yaml:"profile,omitempty"`
RoleARN string `yaml:"role_arn,omitempty"`
Clusters []string `yaml:"clusters,omitempty"`
Port int `yaml:"port"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
// RequestConcurrency controls the maximum number of concurrent Elasticache API requests.
RequestConcurrency int `yaml:"request_concurrency,omitempty"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
}
// NewDiscovererMetrics implements discovery.Config.
func (*ElasticacheSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
return &elasticacheMetrics{
refreshMetrics: rmi,
}
}
// Name returns the name of the Elasticache Config.
func (*ElasticacheSDConfig) Name() string { return "elasticache" }
// NewDiscoverer returns a Discoverer for the Elasticache Config.
func (c *ElasticacheSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
return NewElasticacheDiscovery(c, opts)
}
// UnmarshalYAML implements the yaml.Unmarshaler interface for the Elasticache Config.
func (c *ElasticacheSDConfig) UnmarshalYAML(unmarshal func(any) error) error {
*c = DefaultElasticacheSDConfig
type plain ElasticacheSDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
c.Region, err = loadRegion(context.Background(), c.Region)
if err != nil {
return fmt.Errorf("could not determine AWS region: %w", err)
}
return c.HTTPClientConfig.Validate()
}
type elasticacheClient interface {
DescribeServerlessCaches(ctx context.Context, params *elasticache.DescribeServerlessCachesInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeServerlessCachesOutput, error)
DescribeCacheClusters(ctx context.Context, params *elasticache.DescribeCacheClustersInput, optFns ...func(*elasticache.Options)) (*elasticache.DescribeCacheClustersOutput, error)
ListTagsForResource(ctx context.Context, params *elasticache.ListTagsForResourceInput, optFns ...func(*elasticache.Options)) (*elasticache.ListTagsForResourceOutput, error)
}
// ElasticacheDiscovery periodically performs Elasticache-SD requests.
// It implements the Discoverer interface.
type ElasticacheDiscovery struct {
*refresh.Discovery
logger *slog.Logger
cfg *ElasticacheSDConfig
elasticacheClient elasticacheClient
}
// NewElasticacheDiscovery returns a new ElasticacheDiscovery which periodically refreshes its targets.
func NewElasticacheDiscovery(conf *ElasticacheSDConfig, opts discovery.DiscovererOptions) (*ElasticacheDiscovery, error) {
m, ok := opts.Metrics.(*elasticacheMetrics)
if !ok {
return nil, errors.New("invalid discovery metrics type")
}
if opts.Logger == nil {
opts.Logger = promslog.NewNopLogger()
}
d := &ElasticacheDiscovery{
logger: opts.Logger,
cfg: conf,
}
d.Discovery = refresh.NewDiscovery(
refresh.Options{
Logger: opts.Logger,
Mech: "elasticache",
Interval: time.Duration(d.cfg.RefreshInterval),
RefreshF: d.refresh,
MetricsInstantiator: m.refreshMetrics,
},
)
return d, nil
}
func (d *ElasticacheDiscovery) initElasticacheClient(ctx context.Context) error {
if d.elasticacheClient != nil {
return nil
}
if d.cfg.Region == "" {
return errors.New("region must be set for Elasticache service discovery")
}
// Build the HTTP client from the provided HTTPClientConfig.
client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "elasticache_sd")
if err != nil {
return err
}
// Build the AWS config with the provided region.
var configOptions []func(*awsConfig.LoadOptions) error
configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region))
configOptions = append(configOptions, awsConfig.WithHTTPClient(client))
// Only set static credentials if both access key and secret key are provided
// Otherwise, let AWS SDK use its default credential chain
if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" {
credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "")
configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider))
}
if d.cfg.Profile != "" {
configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile))
}
cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...)
if err != nil {
d.logger.Error("Failed to create AWS config", "error", err)
return fmt.Errorf("could not create aws config: %w", err)
}
// If the role ARN is set, assume the role to get credentials and set the credentials provider in the config.
if d.cfg.RoleARN != "" {
assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN)
cfg.Credentials = aws.NewCredentialsCache(assumeProvider)
}
d.elasticacheClient = elasticache.NewFromConfig(cfg, func(options *elasticache.Options) {
if d.cfg.Endpoint != "" {
options.BaseEndpoint = &d.cfg.Endpoint
}
options.HTTPClient = client
})
// Test credentials by making a simple API call
testCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = d.elasticacheClient.DescribeCacheClusters(testCtx, &elasticache.DescribeCacheClustersInput{})
if err != nil {
d.logger.Error("Failed to test Elasticache credentials", "error", err)
return fmt.Errorf("elasticache credential test failed: %w", err)
}
return nil
}
// describeServerlessCaches calls DescribeServerlessCaches API for the given cache IDs (or all caches if no IDs are provided) and returns the list of serverless caches.
func (d *ElasticacheDiscovery) describeServerlessCaches(ctx context.Context, caches []string) ([]types.ServerlessCache, error) {
mu := &sync.Mutex{}
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
var serverlessCaches []types.ServerlessCache
if len(caches) == 0 {
errg.Go(func() error {
var nextToken *string
for {
output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{
MaxResults: aws.Int32(50),
NextToken: nextToken,
})
if err != nil {
return fmt.Errorf("failed to describe serverless caches: %w", err)
}
mu.Lock()
serverlessCaches = append(serverlessCaches, output.ServerlessCaches...)
mu.Unlock()
if output.NextToken == nil {
break
}
nextToken = output.NextToken
}
return nil
})
} else {
for _, cacheID := range caches {
errg.Go(func() error {
output, err := d.elasticacheClient.DescribeServerlessCaches(ectx, &elasticache.DescribeServerlessCachesInput{
MaxResults: aws.Int32(50),
NextToken: nil,
ServerlessCacheName: aws.String(cacheID),
})
if err != nil {
return fmt.Errorf("failed to describe serverless cache %s: %w", cacheID, err)
}
mu.Lock()
serverlessCaches = append(serverlessCaches, output.ServerlessCaches...)
mu.Unlock()
return nil
})
}
}
return serverlessCaches, errg.Wait()
}
// describeCacheClusters calls DescribeCacheClusters API for the given cache cluster IDs (or all cache clusters if no IDs are provided) and returns the list of cache clusters.
func (d *ElasticacheDiscovery) describeCacheClusters(ctx context.Context, caches []string) ([]types.CacheCluster, error) {
mu := &sync.Mutex{}
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
showCacheClustersNotInReplicationGroupsBools := []bool{false, true}
var cacheClusters []types.CacheCluster
if len(caches) == 0 {
for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools {
errg.Go(func() error {
var nextToken *string
for {
output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{
MaxRecords: aws.Int32(100),
Marker: nextToken,
ShowCacheNodeInfo: aws.Bool(true),
ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool),
})
if err != nil {
return fmt.Errorf("failed to describe cache clusters: %w", err)
}
mu.Lock()
cacheClusters = append(cacheClusters, output.CacheClusters...)
mu.Unlock()
if output.Marker == nil {
break
}
nextToken = output.Marker
}
return nil
})
}
} else {
for _, cacheID := range caches {
for _, showCacheClustersNotInReplicationGroupsBool := range showCacheClustersNotInReplicationGroupsBools {
errg.Go(func() error {
output, err := d.elasticacheClient.DescribeCacheClusters(ectx, &elasticache.DescribeCacheClustersInput{
MaxRecords: aws.Int32(100),
Marker: nil,
ShowCacheNodeInfo: aws.Bool(true),
ShowCacheClustersNotInReplicationGroups: aws.Bool(showCacheClustersNotInReplicationGroupsBool),
CacheClusterId: aws.String(cacheID),
})
if err != nil {
return fmt.Errorf("failed to describe cache cluster %s: %w", cacheID, err)
}
mu.Lock()
cacheClusters = append(cacheClusters, output.CacheClusters...)
mu.Unlock()
return nil
})
}
}
}
return cacheClusters, errg.Wait()
}
// listTagsForResource calls ListTagsForResource API for the given resource ARNs and returns a map of resource ARN to list of tags.
func (d *ElasticacheDiscovery) listTagsForResource(ctx context.Context, resourceARNs []string) (map[string][]types.Tag, error) {
mu := &sync.Mutex{}
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
tagsByResourceARN := make(map[string][]types.Tag)
for _, resourceARN := range resourceARNs {
errg.Go(func() error {
output, err := d.elasticacheClient.ListTagsForResource(ectx, &elasticache.ListTagsForResourceInput{
ResourceName: aws.String(resourceARN),
})
if err != nil {
return fmt.Errorf("failed to list tags for resource %s: %w", resourceARN, err)
}
mu.Lock()
tagsByResourceARN[resourceARN] = output.TagList
mu.Unlock()
return nil
})
}
return tagsByResourceARN, errg.Wait()
}
func (d *ElasticacheDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
err := d.initElasticacheClient(ctx)
if err != nil {
return nil, err
}
var clusters []string
clustersMu := sync.Mutex{}
serverlessCacheIDs, cacheClusterIDs := splitCacheDeploymentOptions(d.cfg.Clusters)
clusterErrg, clusterCtx := errgroup.WithContext(ctx)
clusterErrg.Go(func() error {
caches, err := d.describeServerlessCaches(clusterCtx, serverlessCacheIDs)
if err != nil {
return fmt.Errorf("failed to describe serverless caches: %w", err)
}
for _, cache := range caches {
clustersMu.Lock()
clusters = append(clusters, *cache.ARN)
clustersMu.Unlock()
}
return nil
})
clusterErrg.Go(func() error {
cacheClusters, err := d.describeCacheClusters(clusterCtx, cacheClusterIDs)
if err != nil {
return fmt.Errorf("failed to describe cache clusters: %w", err)
}
for _, cluster := range cacheClusters {
clustersMu.Lock()
clusters = append(clusters, *cluster.ARN)
clustersMu.Unlock()
}
return nil
})
if err := clusterErrg.Wait(); err != nil {
return nil, err
}
tagsByResourceARN, err := d.listTagsForResource(ctx, clusters)
if err != nil {
return nil, fmt.Errorf("failed to list tags for resources: %w", err)
}
tg := &targetgroup.Group{
Source: d.cfg.Region,
}
errg, ectx := errgroup.WithContext(ctx)
errg.Go(func() error {
caches, err := d.describeServerlessCaches(ectx, serverlessCacheIDs)
if err != nil {
return fmt.Errorf("failed to describe serverless caches: %w", err)
}
for _, cache := range caches {
addServerlessCacheTargets(tg, &cache, tagsByResourceARN[*cache.ARN])
}
return nil
})
errg.Go(func() error {
cacheClusters, err := d.describeCacheClusters(ectx, cacheClusterIDs)
if err != nil {
return fmt.Errorf("failed to describe cache clusters: %w", err)
}
for _, cluster := range cacheClusters {
addCacheClusterTargets(tg, &cluster, tagsByResourceARN[*cluster.ARN])
}
return nil
})
if err := errg.Wait(); err != nil {
return nil, err
}
return []*targetgroup.Group{tg}, nil
}
// splitCacheTypes takes a list of cache ARNs and splits them into serverless cache IDs and cache cluster IDs based on their format.
// Serverless caches are in the format arn:aws:elasticache:<REGION>:<ACCOUNT_ID>:serverlesscache:<CACHE_NAME>
// Cache clusters are in the format arn:aws:elasticache:<REGION>:<ACCOUNT_ID>:replicationgroup:<CACHE_CLUSTER_ID>.
func splitCacheDeploymentOptions(caches []string) (serverlessCacheIDs, cacheClusterIDs []string) {
for _, cacheARN := range caches {
if cacheARN == "" {
continue
}
parts := strings.Split(cacheARN, ":")
if len(parts) < 6 {
continue
}
resourceType := parts[5]
resourceID := parts[6]
switch resourceType {
case "serverlesscache":
serverlessCacheIDs = append(serverlessCacheIDs, resourceID)
case "replicationgroup":
cacheClusterIDs = append(cacheClusterIDs, resourceID)
default:
continue
}
}
return serverlessCacheIDs, cacheClusterIDs
}
// addServerlessCacheTargets adds targets for a serverless cache to the target group.
func addServerlessCacheTargets(tg *targetgroup.Group, cache *types.ServerlessCache, tags []types.Tag) {
labels := model.LabelSet{
elasticacheLabelDeploymentOption: model.LabelValue("serverless"),
elasticacheLabelServerlessCacheARN: model.LabelValue(*cache.ARN),
elasticacheLabelServerlessCacheName: model.LabelValue(*cache.ServerlessCacheName),
elasticacheLabelServerlessCacheStatus: model.LabelValue(*cache.Status),
elasticacheLabelServerlessCacheEngine: model.LabelValue(*cache.Engine),
elasticacheLabelServerlessCacheFullEngineVersion: model.LabelValue(*cache.FullEngineVersion),
elasticacheLabelServerlessCacheMajorEngineVersion: model.LabelValue(*cache.MajorEngineVersion),
}
if cache.Description != nil {
labels[elasticacheLabelServerlessCacheDescription] = model.LabelValue(*cache.Description)
}
if cache.CreateTime != nil {
labels[elasticacheLabelServerlessCacheCreateTime] = model.LabelValue(cache.CreateTime.Format(time.RFC3339))
}
if cache.KmsKeyId != nil {
labels[elasticacheLabelServerlessCacheKmsKeyID] = model.LabelValue(*cache.KmsKeyId)
}
if cache.UserGroupId != nil {
labels[elasticacheLabelServerlessCacheUserGroupID] = model.LabelValue(*cache.UserGroupId)
}
if cache.DailySnapshotTime != nil {
labels[elasticacheLabelServerlessCacheDailySnapshotTime] = model.LabelValue(*cache.DailySnapshotTime)
}
if cache.SnapshotRetentionLimit != nil {
labels[elasticacheLabelServerlessCacheSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cache.SnapshotRetentionLimit)))
}
if cache.Endpoint != nil {
if cache.Endpoint.Address != nil {
labels[elasticacheLabelServerlessCacheEndpointAddress] = model.LabelValue(*cache.Endpoint.Address)
}
if cache.Endpoint.Port != nil {
labels[elasticacheLabelServerlessCacheEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.Endpoint.Port)))
}
}
if cache.ReaderEndpoint != nil {
if cache.ReaderEndpoint.Address != nil {
labels[elasticacheLabelServerlessCacheReaderEndpointAddress] = model.LabelValue(*cache.ReaderEndpoint.Address)
}
if cache.ReaderEndpoint.Port != nil {
labels[elasticacheLabelServerlessCacheReaderEndpointPort] = model.LabelValue(strconv.Itoa(int(*cache.ReaderEndpoint.Port)))
}
}
for i, sgID := range cache.SecurityGroupIds {
labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSecurityGroupID, i))] = model.LabelValue(sgID)
}
for i, subnetID := range cache.SubnetIds {
labels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelServerlessCacheSubnetID, i))] = model.LabelValue(subnetID)
}
if cache.CacheUsageLimits != nil {
if cache.CacheUsageLimits.DataStorage != nil {
if cache.CacheUsageLimits.DataStorage.Maximum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Maximum)))
}
if cache.CacheUsageLimits.DataStorage.Minimum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.DataStorage.Minimum)))
}
labels[elasticacheLabelServerlessCacheCacheUsageLimitCacheDataStorageUnit] = model.LabelValue(cache.CacheUsageLimits.DataStorage.Unit)
}
if cache.CacheUsageLimits.ECPUPerSecond != nil {
if cache.CacheUsageLimits.ECPUPerSecond.Maximum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMaximum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Maximum)))
}
if cache.CacheUsageLimits.ECPUPerSecond.Minimum != nil {
labels[elasticacheLabelServerlessCacheCacheUsageLimitECPUPerSecondMinimum] = model.LabelValue(strconv.Itoa(int(*cache.CacheUsageLimits.ECPUPerSecond.Minimum)))
}
}
}
for _, tag := range tags {
if tag.Key != nil && tag.Value != nil {
labels[model.LabelName(elasticacheLabelServerlessCacheTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value)
}
}
// Set the address label using the endpoint
if cache.Endpoint != nil && cache.Endpoint.Address != nil && cache.Endpoint.Port != nil {
labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*cache.Endpoint.Address, strconv.Itoa(int(*cache.Endpoint.Port))))
}
tg.Targets = append(tg.Targets, labels)
}
// addCacheClusterTargets adds targets for a cache cluster to the target group.
// Creates one target per cache node for individual scraping.
func addCacheClusterTargets(tg *targetgroup.Group, cluster *types.CacheCluster, tags []types.Tag) {
// Build common labels that apply to all nodes in this cluster
commonLabels := model.LabelSet{
elasticacheLabelDeploymentOption: model.LabelValue("node"),
elasticacheLabelCacheClusterARN: model.LabelValue(*cluster.ARN),
elasticacheLabelCacheClusterID: model.LabelValue(*cluster.CacheClusterId),
elasticacheLabelCacheClusterStatus: model.LabelValue(*cluster.CacheClusterStatus),
}
if cluster.AtRestEncryptionEnabled != nil {
commonLabels[elasticacheLabelCacheClusterAtRestEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AtRestEncryptionEnabled))
}
if cluster.AuthTokenEnabled != nil {
commonLabels[elasticacheLabelCacheClusterAuthTokenEnabled] = model.LabelValue(strconv.FormatBool(*cluster.AuthTokenEnabled))
}
if cluster.AuthTokenLastModifiedDate != nil {
commonLabels[elasticacheLabelCacheClusterAuthTokenLastModified] = model.LabelValue(cluster.AuthTokenLastModifiedDate.Format(time.RFC3339))
}
if cluster.AutoMinorVersionUpgrade != nil {
commonLabels[elasticacheLabelCacheClusterAutoMinorVersionUpgrade] = model.LabelValue(strconv.FormatBool(*cluster.AutoMinorVersionUpgrade))
}
if cluster.CacheClusterCreateTime != nil {
commonLabels[elasticacheLabelCacheClusterCreateTime] = model.LabelValue(cluster.CacheClusterCreateTime.Format(time.RFC3339))
}
if cluster.CacheNodeType != nil {
commonLabels[elasticacheLabelCacheClusterNodeType] = model.LabelValue(*cluster.CacheNodeType)
}
if cluster.CacheParameterGroup != nil && cluster.CacheParameterGroup.CacheParameterGroupName != nil {
commonLabels[elasticacheLabelCacheClusterParameterGroup] = model.LabelValue(*cluster.CacheParameterGroup.CacheParameterGroupName)
}
if cluster.CacheSubnetGroupName != nil {
commonLabels[elasticacheLabelCacheClusterSubnetGroupName] = model.LabelValue(*cluster.CacheSubnetGroupName)
}
if cluster.ClientDownloadLandingPage != nil {
commonLabels[elasticacheLabelCacheClusterClientDownloadLandingPage] = model.LabelValue(*cluster.ClientDownloadLandingPage)
}
if cluster.ConfigurationEndpoint != nil {
if cluster.ConfigurationEndpoint.Address != nil {
commonLabels[elasticacheLabelCacheClusterConfigurationEndpointAddress] = model.LabelValue(*cluster.ConfigurationEndpoint.Address)
}
if cluster.ConfigurationEndpoint.Port != nil {
commonLabels[elasticacheLabelCacheClusterConfigurationEndpointPort] = model.LabelValue(strconv.Itoa(int(*cluster.ConfigurationEndpoint.Port)))
}
}
if cluster.Engine != nil {
commonLabels[elasticacheLabelCacheClusterEngine] = model.LabelValue(*cluster.Engine)
}
if cluster.EngineVersion != nil {
commonLabels[elasticacheLabelCacheClusterEngineVersion] = model.LabelValue(*cluster.EngineVersion)
}
if len(cluster.IpDiscovery) > 0 {
commonLabels[elasticacheLabelCacheClusterIPDiscovery] = model.LabelValue(cluster.IpDiscovery)
}
if len(cluster.NetworkType) > 0 {
commonLabels[elasticacheLabelCacheClusterNetworkType] = model.LabelValue(cluster.NetworkType)
}
if cluster.NotificationConfiguration != nil {
if cluster.NotificationConfiguration.TopicArn != nil {
commonLabels[elasticacheLabelCacheClusterNotificationTopicARN] = model.LabelValue(*cluster.NotificationConfiguration.TopicArn)
}
if cluster.NotificationConfiguration.TopicStatus != nil {
commonLabels[elasticacheLabelCacheClusterNotificationTopicStatus] = model.LabelValue(*cluster.NotificationConfiguration.TopicStatus)
}
}
if cluster.NumCacheNodes != nil {
commonLabels[elasticacheLabelCacheClusterNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.NumCacheNodes)))
}
if cluster.PreferredAvailabilityZone != nil {
commonLabels[elasticacheLabelCacheClusterPreferredAvailabilityZone] = model.LabelValue(*cluster.PreferredAvailabilityZone)
}
if cluster.PreferredMaintenanceWindow != nil {
commonLabels[elasticacheLabelCacheClusterPreferredMaintenanceWindow] = model.LabelValue(*cluster.PreferredMaintenanceWindow)
}
if cluster.PreferredOutpostArn != nil {
commonLabels[elasticacheLabelCacheClusterPreferredOutpostARN] = model.LabelValue(*cluster.PreferredOutpostArn)
}
if cluster.ReplicationGroupId != nil {
commonLabels[elasticacheLabelCacheClusterReplicationGroupID] = model.LabelValue(*cluster.ReplicationGroupId)
}
if cluster.ReplicationGroupLogDeliveryEnabled != nil {
commonLabels[elasticacheLabelCacheClusterReplicationGroupLogDeliveryEnabled] = model.LabelValue(strconv.FormatBool(*cluster.ReplicationGroupLogDeliveryEnabled))
}
if cluster.SnapshotRetentionLimit != nil {
commonLabels[elasticacheLabelCacheClusterSnapshotRetentionLimit] = model.LabelValue(strconv.Itoa(int(*cluster.SnapshotRetentionLimit)))
}
if cluster.SnapshotWindow != nil {
commonLabels[elasticacheLabelCacheClusterSnapshotWindow] = model.LabelValue(*cluster.SnapshotWindow)
}
if cluster.TransitEncryptionEnabled != nil {
commonLabels[elasticacheLabelCacheClusterTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.TransitEncryptionEnabled))
}
if len(cluster.TransitEncryptionMode) > 0 {
commonLabels[elasticacheLabelCacheClusterTransitEncryptionMode] = model.LabelValue(cluster.TransitEncryptionMode)
}
// Log delivery configurations (slice)
for i, logDelivery := range cluster.LogDeliveryConfigurations {
if len(logDelivery.DestinationType) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDestinationType, i))] = model.LabelValue(logDelivery.DestinationType)
}
if len(logDelivery.LogFormat) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogFormat, i))] = model.LabelValue(logDelivery.LogFormat)
}
if len(logDelivery.LogType) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogType, i))] = model.LabelValue(logDelivery.LogType)
}
if len(logDelivery.Status) > 0 {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationStatus, i))] = model.LabelValue(logDelivery.Status)
}
if logDelivery.Message != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationMessage, i))] = model.LabelValue(*logDelivery.Message)
}
if logDelivery.DestinationDetails != nil {
if logDelivery.DestinationDetails.CloudWatchLogsDetails != nil && logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationLogGroup, i))] = model.LabelValue(*logDelivery.DestinationDetails.CloudWatchLogsDetails.LogGroup)
}
if logDelivery.DestinationDetails.KinesisFirehoseDetails != nil && logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterLogDeliveryConfigurationDeliveryStream, i))] = model.LabelValue(*logDelivery.DestinationDetails.KinesisFirehoseDetails.DeliveryStream)
}
}
}
// Pending modified values
if cluster.PendingModifiedValues != nil {
if len(cluster.PendingModifiedValues.AuthTokenStatus) > 0 {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesAuthTokenStatus] = model.LabelValue(cluster.PendingModifiedValues.AuthTokenStatus)
}
if cluster.PendingModifiedValues.CacheNodeType != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeType] = model.LabelValue(*cluster.PendingModifiedValues.CacheNodeType)
}
if cluster.PendingModifiedValues.EngineVersion != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesEngineVersion] = model.LabelValue(*cluster.PendingModifiedValues.EngineVersion)
}
if cluster.PendingModifiedValues.NumCacheNodes != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesNumCacheNodes] = model.LabelValue(strconv.Itoa(int(*cluster.PendingModifiedValues.NumCacheNodes)))
}
if cluster.PendingModifiedValues.TransitEncryptionEnabled != nil {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionEnabled] = model.LabelValue(strconv.FormatBool(*cluster.PendingModifiedValues.TransitEncryptionEnabled))
}
if len(cluster.PendingModifiedValues.TransitEncryptionMode) > 0 {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesTransitEncryptionMode] = model.LabelValue(cluster.PendingModifiedValues.TransitEncryptionMode)
}
if len(cluster.PendingModifiedValues.CacheNodeIdsToRemove) > 0 {
commonLabels[elasticacheLabelCacheClusterPendingModifiedValuesCacheNodeIDsToRemove] = model.LabelValue(strings.Join(cluster.PendingModifiedValues.CacheNodeIdsToRemove, ","))
}
}
// Security group membership (slice)
for i, sg := range cluster.SecurityGroups {
if sg.SecurityGroupId != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipID, i))] = model.LabelValue(*sg.SecurityGroupId)
}
if sg.Status != nil {
commonLabels[model.LabelName(fmt.Sprintf("%s_%d", elasticacheLabelCacheClusterSecurityGroupMembershipStatus, i))] = model.LabelValue(*sg.Status)
}
}
// Tags
for _, tag := range tags {
if tag.Key != nil && tag.Value != nil {
commonLabels[model.LabelName(elasticacheLabelCacheClusterTag+strutil.SanitizeLabelName(*tag.Key))] = model.LabelValue(*tag.Value)
}
}
// Create one target per cache node
for _, node := range cluster.CacheNodes {
// Clone common labels for this node
labels := make(model.LabelSet, len(commonLabels))
maps.Copy(labels, commonLabels)
// Add node-specific labels
if node.CacheNodeId != nil {
labels[elasticacheLabelCacheClusterNodeID] = model.LabelValue(*node.CacheNodeId)
}
if node.CacheNodeStatus != nil {
labels[elasticacheLabelCacheClusterNodeStatus] = model.LabelValue(*node.CacheNodeStatus)
}
if node.CacheNodeCreateTime != nil {
labels[elasticacheLabelCacheClusterNodeCreateTime] = model.LabelValue(node.CacheNodeCreateTime.Format(time.RFC3339))
}
if node.CustomerAvailabilityZone != nil {
labels[elasticacheLabelCacheClusterNodeAZ] = model.LabelValue(*node.CustomerAvailabilityZone)
}
if node.CustomerOutpostArn != nil {
labels[elasticacheLabelCacheClusterNodeCustomerOutpostARN] = model.LabelValue(*node.CustomerOutpostArn)
}
if node.SourceCacheNodeId != nil {
labels[elasticacheLabelCacheClusterNodeSourceCacheNodeID] = model.LabelValue(*node.SourceCacheNodeId)
}
if node.ParameterGroupStatus != nil {
labels[elasticacheLabelCacheClusterNodeParameterGroupStatus] = model.LabelValue(*node.ParameterGroupStatus)
}
if node.Endpoint != nil {
if node.Endpoint.Address != nil {
labels[elasticacheLabelCacheClusterNodeEndpointAddress] = model.LabelValue(*node.Endpoint.Address)
}
if node.Endpoint.Port != nil {
labels[elasticacheLabelCacheClusterNodeEndpointPort] = model.LabelValue(strconv.Itoa(int(*node.Endpoint.Port)))
}
// Set the address label to this node's endpoint
if node.Endpoint.Address != nil && node.Endpoint.Port != nil {
labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(*node.Endpoint.Address, strconv.Itoa(int(*node.Endpoint.Port))))
}
}
tg.Targets = append(tg.Targets, labels)
}
}