Joe Adams 68df59ba0c
Merge pull request #18029 from matt-gp/ecs-sd-bug-17920
AWS SD: ECS Discover Standalone Tasks
2026-02-10 22:03:28 -05:00

996 lines
33 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package aws
import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"slices"
"strconv"
"strings"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
awsConfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/ecs"
"github.com/aws/aws-sdk-go-v2/service/ecs/types"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"golang.org/x/sync/errgroup"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/refresh"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/strutil"
)
const (
ecsLabel = model.MetaLabelPrefix + "ecs_"
ecsLabelCluster = ecsLabel + "cluster"
ecsLabelClusterARN = ecsLabel + "cluster_arn"
ecsLabelService = ecsLabel + "service"
ecsLabelServiceARN = ecsLabel + "service_arn"
ecsLabelServiceStatus = ecsLabel + "service_status"
ecsLabelTaskGroup = ecsLabel + "task_group"
ecsLabelTaskARN = ecsLabel + "task_arn"
ecsLabelTaskDefinition = ecsLabel + "task_definition"
ecsLabelRegion = ecsLabel + "region"
ecsLabelAvailabilityZone = ecsLabel + "availability_zone"
ecsLabelSubnetID = ecsLabel + "subnet_id"
ecsLabelIPAddress = ecsLabel + "ip_address"
ecsLabelLaunchType = ecsLabel + "launch_type"
ecsLabelDesiredStatus = ecsLabel + "desired_status"
ecsLabelLastStatus = ecsLabel + "last_status"
ecsLabelHealthStatus = ecsLabel + "health_status"
ecsLabelPlatformFamily = ecsLabel + "platform_family"
ecsLabelPlatformVersion = ecsLabel + "platform_version"
ecsLabelTag = ecsLabel + "tag_"
ecsLabelTagCluster = ecsLabelTag + "cluster_"
ecsLabelTagService = ecsLabelTag + "service_"
ecsLabelTagTask = ecsLabelTag + "task_"
ecsLabelTagEC2 = ecsLabelTag + "ec2_"
ecsLabelNetworkMode = ecsLabel + "network_mode"
ecsLabelContainerInstanceARN = ecsLabel + "container_instance_arn"
ecsLabelEC2InstanceID = ecsLabel + "ec2_instance_id"
ecsLabelEC2InstanceType = ecsLabel + "ec2_instance_type"
ecsLabelEC2InstancePrivateIP = ecsLabel + "ec2_instance_private_ip"
ecsLabelEC2InstancePublicIP = ecsLabel + "ec2_instance_public_ip"
ecsLabelPublicIP = ecsLabel + "public_ip"
)
// DefaultECSSDConfig is the default ECS SD configuration.
var DefaultECSSDConfig = ECSSDConfig{
Port: 80,
RefreshInterval: model.Duration(60 * time.Second),
RequestConcurrency: 20, // Aligned with AWS ECS API sustained rate limits (20 req/sec)
HTTPClientConfig: config.DefaultHTTPClientConfig,
}
func init() {
discovery.RegisterConfig(&ECSSDConfig{})
}
// ECSSDConfig is the configuration for ECS based service discovery.
type ECSSDConfig struct {
Region string `yaml:"region"`
Endpoint string `yaml:"endpoint"`
AccessKey string `yaml:"access_key,omitempty"`
SecretKey config.Secret `yaml:"secret_key,omitempty"`
Profile string `yaml:"profile,omitempty"`
RoleARN string `yaml:"role_arn,omitempty"`
Clusters []string `yaml:"clusters,omitempty"`
Port int `yaml:"port"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
// RequestConcurrency controls the maximum number of concurrent ECS API requests.
// Default is 20, which aligns with AWS ECS sustained rate limits:
// - Cluster read actions (DescribeClusters, ListClusters): 20 req/sec sustained
// - Service read actions (DescribeServices, ListServices): 20 req/sec sustained
// - Cluster resource read actions (DescribeTasks, ListTasks): 20 req/sec sustained
// See: https://docs.aws.amazon.com/AmazonECS/latest/APIReference/request-throttling.html
RequestConcurrency int `yaml:"request_concurrency,omitempty"`
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
}
// NewDiscovererMetrics implements discovery.Config.
func (*ECSSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
return &ecsMetrics{
refreshMetrics: rmi,
}
}
// Name returns the name of the ECS Config.
func (*ECSSDConfig) Name() string { return "ecs" }
// NewDiscoverer returns a Discoverer for the EC2 Config.
func (c *ECSSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
return NewECSDiscovery(c, opts)
}
// UnmarshalYAML implements the yaml.Unmarshaler interface for the ECS Config.
func (c *ECSSDConfig) UnmarshalYAML(unmarshal func(any) error) error {
*c = DefaultECSSDConfig
type plain ECSSDConfig
err := unmarshal((*plain)(c))
if err != nil {
return err
}
c.Region, err = loadRegion(context.Background(), c.Region)
if err != nil {
return fmt.Errorf("could not determine AWS region: %w", err)
}
return c.HTTPClientConfig.Validate()
}
type ecsClient interface {
ListClusters(context.Context, *ecs.ListClustersInput, ...func(*ecs.Options)) (*ecs.ListClustersOutput, error)
DescribeClusters(context.Context, *ecs.DescribeClustersInput, ...func(*ecs.Options)) (*ecs.DescribeClustersOutput, error)
ListServices(context.Context, *ecs.ListServicesInput, ...func(*ecs.Options)) (*ecs.ListServicesOutput, error)
DescribeServices(context.Context, *ecs.DescribeServicesInput, ...func(*ecs.Options)) (*ecs.DescribeServicesOutput, error)
ListTasks(context.Context, *ecs.ListTasksInput, ...func(*ecs.Options)) (*ecs.ListTasksOutput, error)
DescribeTasks(context.Context, *ecs.DescribeTasksInput, ...func(*ecs.Options)) (*ecs.DescribeTasksOutput, error)
DescribeContainerInstances(context.Context, *ecs.DescribeContainerInstancesInput, ...func(*ecs.Options)) (*ecs.DescribeContainerInstancesOutput, error)
}
type ecsEC2Client interface {
DescribeInstances(context.Context, *ec2.DescribeInstancesInput, ...func(*ec2.Options)) (*ec2.DescribeInstancesOutput, error)
DescribeNetworkInterfaces(context.Context, *ec2.DescribeNetworkInterfacesInput, ...func(*ec2.Options)) (*ec2.DescribeNetworkInterfacesOutput, error)
}
// ECSDiscovery periodically performs ECS-SD requests. It implements
// the Discoverer interface.
type ECSDiscovery struct {
*refresh.Discovery
logger *slog.Logger
cfg *ECSSDConfig
ecs ecsClient
ec2 ecsEC2Client
}
// NewECSDiscovery returns a new ECSDiscovery which periodically refreshes its targets.
func NewECSDiscovery(conf *ECSSDConfig, opts discovery.DiscovererOptions) (*ECSDiscovery, error) {
m, ok := opts.Metrics.(*ecsMetrics)
if !ok {
return nil, errors.New("invalid discovery metrics type")
}
if opts.Logger == nil {
opts.Logger = promslog.NewNopLogger()
}
d := &ECSDiscovery{
logger: opts.Logger,
cfg: conf,
}
d.Discovery = refresh.NewDiscovery(
refresh.Options{
Logger: opts.Logger,
Mech: "ecs",
Interval: time.Duration(d.cfg.RefreshInterval),
RefreshF: d.refresh,
MetricsInstantiator: m.refreshMetrics,
},
)
return d, nil
}
func (d *ECSDiscovery) initEcsClient(ctx context.Context) error {
if d.ecs != nil && d.ec2 != nil {
return nil
}
if d.cfg.Region == "" {
return errors.New("region must be set for ECS service discovery")
}
// Build the HTTP client from the provided HTTPClientConfig.
client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "ecs_sd")
if err != nil {
return err
}
// Build the AWS config with the provided region.
var configOptions []func(*awsConfig.LoadOptions) error
configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region))
configOptions = append(configOptions, awsConfig.WithHTTPClient(client))
// Only set static credentials if both access key and secret key are provided
// Otherwise, let AWS SDK use its default credential chain
if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" {
credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "")
configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider))
}
if d.cfg.Profile != "" {
configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile))
}
cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...)
if err != nil {
d.logger.Error("Failed to create AWS config", "error", err)
return fmt.Errorf("could not create aws config: %w", err)
}
// If the role ARN is set, assume the role to get credentials and set the credentials provider in the config.
if d.cfg.RoleARN != "" {
assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN)
cfg.Credentials = aws.NewCredentialsCache(assumeProvider)
}
d.ecs = ecs.NewFromConfig(cfg, func(options *ecs.Options) {
if d.cfg.Endpoint != "" {
options.BaseEndpoint = &d.cfg.Endpoint
}
options.HTTPClient = client
})
d.ec2 = ec2.NewFromConfig(cfg, func(options *ec2.Options) {
options.HTTPClient = client
})
// Test credentials by making a simple API call
testCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
_, err = d.ecs.DescribeClusters(testCtx, &ecs.DescribeClustersInput{})
if err != nil {
d.logger.Error("Failed to test ECS credentials", "error", err)
return fmt.Errorf("ECS credential test failed: %w", err)
}
return nil
}
// listClusterARNs returns a slice of cluster arns.
// This method does not use concurrency as it's a simple paginated call.
func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) {
var (
clusterARNs []string
nextToken *string
)
for {
resp, err := d.ecs.ListClusters(ctx, &ecs.ListClustersInput{
NextToken: nextToken,
MaxResults: aws.Int32(100),
})
if err != nil {
return nil, fmt.Errorf("could not list clusters: %w", err)
}
clusterARNs = append(clusterARNs, resp.ClusterArns...)
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
return clusterARNs, nil
}
// describeClusters returns a map of cluster ARN to a slice of clusters.
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
// Clusters are described in batches of 100 to respect AWS API limits (DescribeClusters allows up to 100 clusters per call).
func (d *ECSDiscovery) describeClusters(ctx context.Context, clusters []string) (map[string]types.Cluster, error) {
mu := sync.Mutex{}
clusterMap := make(map[string]types.Cluster)
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for batch := range slices.Chunk(clusters, 100) {
errg.Go(func() error {
resp, err := d.ecs.DescribeClusters(ectx, &ecs.DescribeClustersInput{
Clusters: batch,
Include: []types.ClusterField{"TAGS"},
})
if err != nil {
d.logger.Error("Failed to describe clusters", "clusters", batch, "error", err)
return fmt.Errorf("could not describe clusters %v: %w", batch, err)
}
for _, cluster := range resp.Clusters {
if cluster.ClusterArn != nil {
mu.Lock()
clusterMap[*cluster.ClusterArn] = cluster
mu.Unlock()
}
}
return nil
})
}
return clusterMap, errg.Wait()
}
// listServiceARNs returns a map of cluster ARN to a slice of service ARNs.
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
// Services are listed in batches of 100 to respect AWS API limits (ListServices allows up to 100 services per call).
func (d *ECSDiscovery) listServiceARNs(ctx context.Context, clusters []string) (map[string][]string, error) {
mu := sync.Mutex{}
services := make(map[string][]string)
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for _, clusterARN := range clusters {
errg.Go(func() error {
var nextToken *string
var serviceARNs []string
for {
resp, err := d.ecs.ListServices(ectx, &ecs.ListServicesInput{
Cluster: aws.String(clusterARN),
NextToken: nextToken,
MaxResults: aws.Int32(100),
})
if err != nil {
return fmt.Errorf("could not list services for cluster %q: %w", clusterARN, err)
}
serviceARNs = append(serviceARNs, resp.ServiceArns...)
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
mu.Lock()
services[clusterARN] = serviceARNs
mu.Unlock()
return nil
})
}
return services, errg.Wait()
}
// describeServices returns a map of service name to service.
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
// Services are described in batches of 10 to respect AWS API limits (DescribeServices allows up to 10 services per call).
func (d *ECSDiscovery) describeServices(ctx context.Context, clusterARN string, serviceARNS []string) (map[string]types.Service, error) {
mu := sync.Mutex{}
services := make(map[string]types.Service)
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for batch := range slices.Chunk(serviceARNS, 10) {
errg.Go(func() error {
resp, err := d.ecs.DescribeServices(ectx, &ecs.DescribeServicesInput{
Cluster: aws.String(clusterARN),
Services: batch,
Include: []types.ServiceField{"TAGS"},
})
if err != nil {
d.logger.Error("Failed to describe services", "cluster", clusterARN, "batch", batch, "error", err)
return fmt.Errorf("could not describe services for cluster %q: batch %v: %w", clusterARN, batch, err)
}
for _, service := range resp.Services {
if service.ServiceArn != nil {
mu.Lock()
services[*service.ServiceName] = service
mu.Unlock()
}
}
return nil
})
}
return services, errg.Wait()
}
// listTaskARNs returns a map of clustersARN to a slice of task ARNs.
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
// Tasks are listed in batches of 100 to respect AWS API limits (ListTasks allows up to 100 tasks per call).
// This method also uses pagination to handle cases where there are more than 100 tasks in a cluster.
func (d *ECSDiscovery) listTaskARNs(ctx context.Context, clusterARNs []string) (map[string][]string, error) {
mu := sync.Mutex{}
tasks := make(map[string][]string)
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for _, clusterARN := range clusterARNs {
errg.Go(func() error {
var (
nextToken *string
taskARNs []string
)
for {
resp, err := d.ecs.ListTasks(ectx, &ecs.ListTasksInput{
Cluster: aws.String(clusterARN),
NextToken: nextToken,
MaxResults: aws.Int32(100),
})
if err != nil {
return fmt.Errorf("could not list tasks for cluster %q: %w", clusterARN, err)
}
taskARNs = append(taskARNs, resp.TaskArns...)
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
mu.Lock()
tasks[clusterARN] = taskARNs
mu.Unlock()
return nil
})
}
return tasks, errg.Wait()
}
// describeTasks returns a slice of tasks.
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
// Tasks are described in batches of 100 to respect AWS API limits (DescribeTasks allows up to 100 tasks per call).
func (d *ECSDiscovery) describeTasks(ctx context.Context, clusterARN string, taskARNs []string) ([]types.Task, error) {
mu := sync.Mutex{}
var tasks []types.Task
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for batch := range slices.Chunk(taskARNs, 100) {
errg.Go(func() error {
resp, err := d.ecs.DescribeTasks(ectx, &ecs.DescribeTasksInput{
Cluster: aws.String(clusterARN),
Tasks: batch,
Include: []types.TaskField{"TAGS"},
})
if err != nil {
d.logger.Error("Failed to describe tasks", "cluster", clusterARN, "batch", batch, "error", err)
return fmt.Errorf("could not describe tasks in cluster %q: batch %v: %w", clusterARN, batch, err)
}
mu.Lock()
tasks = append(tasks, resp.Tasks...)
mu.Unlock()
return nil
})
}
return tasks, errg.Wait()
}
// describeContainerInstances returns a map of container instance ARN to EC2 instance ID
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
// Container instances are described in batches of 100 to respect AWS API limits (DescribeContainerInstances allows up to 100 container instances per call).
func (d *ECSDiscovery) describeContainerInstances(ctx context.Context, clusterARN string, tasks []types.Task) (map[string]string, error) {
containerInstanceARNs := make([]string, 0, len(tasks))
for _, task := range tasks {
if task.ContainerInstanceArn != nil {
containerInstanceARNs = append(containerInstanceARNs, *task.ContainerInstanceArn)
}
}
if len(containerInstanceARNs) == 0 {
return make(map[string]string), nil
}
mu := sync.Mutex{}
containerInstToEC2 := make(map[string]string)
errg, ectx := errgroup.WithContext(ctx)
errg.SetLimit(d.cfg.RequestConcurrency)
for batch := range slices.Chunk(containerInstanceARNs, 100) {
errg.Go(func() error {
resp, err := d.ecs.DescribeContainerInstances(ectx, &ecs.DescribeContainerInstancesInput{
Cluster: aws.String(clusterARN),
ContainerInstances: batch,
})
if err != nil {
return fmt.Errorf("could not describe container instances: %w", err)
}
for _, ci := range resp.ContainerInstances {
if ci.ContainerInstanceArn != nil && ci.Ec2InstanceId != nil {
mu.Lock()
containerInstToEC2[*ci.ContainerInstanceArn] = *ci.Ec2InstanceId
mu.Unlock()
}
}
return nil
})
}
return containerInstToEC2, errg.Wait()
}
// ec2InstanceInfo holds information retrieved from EC2 DescribeInstances.
type ec2InstanceInfo struct {
privateIP string
publicIP string
subnetID string
instanceType string
tags map[string]string
}
// describeEC2Instances returns a map of EC2 instance ID to instance information.
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
// This method does not use concurrency as it's a simple paginated call.
func (d *ECSDiscovery) describeEC2Instances(ctx context.Context, instanceIDs []string) (map[string]ec2InstanceInfo, error) {
if len(instanceIDs) == 0 {
return make(map[string]ec2InstanceInfo), nil
}
instanceInfo := make(map[string]ec2InstanceInfo)
var nextToken *string
for {
resp, err := d.ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
InstanceIds: instanceIDs,
NextToken: nextToken,
})
if err != nil {
return nil, fmt.Errorf("could not describe EC2 instances: %w", err)
}
for _, reservation := range resp.Reservations {
for _, instance := range reservation.Instances {
if instance.InstanceId != nil && instance.PrivateIpAddress != nil {
info := ec2InstanceInfo{
privateIP: *instance.PrivateIpAddress,
tags: make(map[string]string),
}
if instance.PublicIpAddress != nil {
info.publicIP = *instance.PublicIpAddress
}
if instance.SubnetId != nil {
info.subnetID = *instance.SubnetId
}
if instance.InstanceType != "" {
info.instanceType = string(instance.InstanceType)
}
// Collect EC2 instance tags
for _, tag := range instance.Tags {
if tag.Key != nil && tag.Value != nil {
info.tags[*tag.Key] = *tag.Value
}
}
instanceInfo[*instance.InstanceId] = info
}
}
}
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
return instanceInfo, nil
}
// describeNetworkInterfaces returns a map of ENI ID to public IP address.
// This is needed to get the public IP for tasks using awsvpc network mode, as the ENI is what gets the public IP, not the EC2 instance.
// This method does not use concurrency as it's a simple paginated call.
func (d *ECSDiscovery) describeNetworkInterfaces(ctx context.Context, tasks []types.Task) (map[string]string, error) {
eniIDs := make([]string, 0, len(tasks))
for _, task := range tasks {
for _, attachment := range task.Attachments {
if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" {
for _, detail := range attachment.Details {
if detail.Name != nil && *detail.Name == "networkInterfaceId" && detail.Value != nil {
eniIDs = append(eniIDs, *detail.Value)
break
}
}
break
}
}
}
if len(eniIDs) == 0 {
return make(map[string]string), nil
}
eniToPublicIP := make(map[string]string)
var nextToken *string
for {
resp, err := d.ec2.DescribeNetworkInterfaces(ctx, &ec2.DescribeNetworkInterfacesInput{
NetworkInterfaceIds: eniIDs,
NextToken: nextToken,
})
if err != nil {
return nil, fmt.Errorf("could not describe network interfaces: %w", err)
}
for _, eni := range resp.NetworkInterfaces {
if eni.NetworkInterfaceId != nil && eni.Association != nil && eni.Association.PublicIp != nil {
eniToPublicIP[*eni.NetworkInterfaceId] = *eni.Association.PublicIp
}
}
if resp.NextToken == nil {
break
}
nextToken = resp.NextToken
}
return eniToPublicIP, nil
}
func (d *ECSDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
err := d.initEcsClient(ctx)
if err != nil {
return nil, err
}
var clusters []string
if len(d.cfg.Clusters) == 0 {
clusters, err = d.listClusterARNs(ctx)
if err != nil {
return nil, err
}
} else {
clusters = d.cfg.Clusters
}
if len(clusters) == 0 {
return []*targetgroup.Group{
{
Source: d.cfg.Region,
},
}, nil
}
tg := &targetgroup.Group{
Source: d.cfg.Region,
}
// Fetch cluster details, service ARNs, and task ARNs in parallel
var (
clusterMap map[string]types.Cluster
serviceMap map[string][]string
taskMap map[string][]string
)
clusterErrg, clusterCtx := errgroup.WithContext(ctx)
clusterErrg.Go(func() error {
var err error
clusterMap, err = d.describeClusters(clusterCtx, clusters)
return err
})
clusterErrg.Go(func() error {
var err error
serviceMap, err = d.listServiceARNs(clusterCtx, clusters)
return err
})
clusterErrg.Go(func() error {
var err error
taskMap, err = d.listTaskARNs(clusterCtx, clusters)
return err
})
if err := clusterErrg.Wait(); err != nil {
return nil, err
}
// Use goroutines to process clusters in parallel
var (
clusterWg sync.WaitGroup
clusterMu sync.Mutex
clusterTargets []model.LabelSet
)
for clusterARN, taskARNs := range taskMap {
if len(taskARNs) == 0 {
continue
}
clusterWg.Add(1)
go func(cluster types.Cluster, serviceARNs, taskARNs []string) {
defer clusterWg.Done()
// Fetch services and tasks in parallel (they're independent)
var (
services map[string]types.Service
tasks []types.Task
)
resourceErrg, resourceCtx := errgroup.WithContext(ctx)
resourceErrg.Go(func() error {
var err error
services, err = d.describeServices(resourceCtx, *cluster.ClusterArn, serviceARNs)
if err != nil {
d.logger.Error("Failed to describe services for cluster", "cluster", *cluster.ClusterArn, "error", err)
}
return err
})
resourceErrg.Go(func() error {
var err error
tasks, err = d.describeTasks(resourceCtx, *cluster.ClusterArn, taskARNs)
if err != nil {
d.logger.Error("Failed to describe tasks for cluster", "cluster", *cluster.ClusterArn, "error", err)
}
return err
})
if err := resourceErrg.Wait(); err != nil {
return
}
// Fetch container instances and network interfaces in parallel (both depend on tasks)
var (
containerInstances map[string]string
eniToPublicIP map[string]string
)
instanceErrg, instanceCtx := errgroup.WithContext(ctx)
instanceErrg.Go(func() error {
var err error
containerInstances, err = d.describeContainerInstances(instanceCtx, *cluster.ClusterArn, tasks)
if err != nil {
d.logger.Error("Failed to describe container instances for cluster", "cluster", *cluster.ClusterArn, "error", err)
}
return err
})
instanceErrg.Go(func() error {
var err error
eniToPublicIP, err = d.describeNetworkInterfaces(instanceCtx, tasks)
if err != nil {
d.logger.Error("Failed to describe network interfaces for cluster", "cluster", *cluster.ClusterArn, "error", err)
}
return err
})
if err := instanceErrg.Wait(); err != nil {
return
}
ec2Instances := make(map[string]ec2InstanceInfo)
if len(containerInstances) > 0 {
// Deduplicate EC2 instance IDs (multiple tasks can share the same instance)
ec2InstanceIDSet := make(map[string]struct{})
for _, ec2ID := range containerInstances {
ec2InstanceIDSet[ec2ID] = struct{}{}
}
ec2InstanceIDs := make([]string, 0, len(ec2InstanceIDSet))
for ec2ID := range ec2InstanceIDSet {
ec2InstanceIDs = append(ec2InstanceIDs, ec2ID)
}
ec2Instances, err = d.describeEC2Instances(ctx, ec2InstanceIDs)
if err != nil {
d.logger.Error("Failed to describe EC2 instances for cluster", "cluster", *cluster.ClusterArn, "error", err)
return
}
}
var (
taskWg sync.WaitGroup
taskMu sync.Mutex
taskTargets []model.LabelSet
)
for _, task := range tasks {
taskWg.Add(1)
go func(cluster types.Cluster, services map[string]types.Service, task types.Task, containerInstances map[string]string, ec2Instances map[string]ec2InstanceInfo, eniToPublicIP map[string]string) {
defer taskWg.Done()
var (
ipAddress, subnetID, publicIP string
networkMode string
ec2InstanceID, ec2InstanceType, ec2InstancePrivateIP, ec2InstancePublicIP string
)
// Try to get IP from ENI attachment (awsvpc mode)
var eniAttachment *types.Attachment
for _, attachment := range task.Attachments {
if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" {
eniAttachment = &attachment
break
}
}
if eniAttachment != nil {
// awsvpc networking mode - get IP from ENI
networkMode = "awsvpc"
var eniID string
for _, detail := range eniAttachment.Details {
switch *detail.Name {
case "privateIPv4Address":
ipAddress = *detail.Value
case "subnetId":
subnetID = *detail.Value
case "networkInterfaceId":
eniID = *detail.Value
}
}
// Get public IP from ENI if available
if eniID != "" {
if pub, ok := eniToPublicIP[eniID]; ok {
publicIP = pub
}
}
} else if task.ContainerInstanceArn != nil {
// bridge/host networking mode - need to get EC2 instance IP and subnet
networkMode = "bridge"
var ok bool
ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn]
if ok {
info, ok := ec2Instances[ec2InstanceID]
if ok {
ipAddress = info.privateIP
publicIP = info.publicIP
subnetID = info.subnetID
ec2InstanceType = info.instanceType
ec2InstancePrivateIP = info.privateIP
ec2InstancePublicIP = info.publicIP
} else {
d.logger.Debug("EC2 instance info not found", "instance", ec2InstanceID, "task", *task.TaskArn)
}
} else {
d.logger.Debug("Container instance not found in map", "arn", *task.ContainerInstanceArn, "task", *task.TaskArn)
}
}
// Get EC2 instance metadata for awsvpc tasks running on EC2
// We want the instance type and the host IPs for advanced use cases
if networkMode == "awsvpc" && task.ContainerInstanceArn != nil {
var ok bool
ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn]
if ok {
info, ok := ec2Instances[ec2InstanceID]
if ok {
ec2InstanceType = info.instanceType
ec2InstancePrivateIP = info.privateIP
ec2InstancePublicIP = info.publicIP
}
}
}
if ipAddress == "" {
return
}
labels := model.LabelSet{
ecsLabelClusterARN: model.LabelValue(*cluster.ClusterArn),
ecsLabelCluster: model.LabelValue(*cluster.ClusterName),
ecsLabelTaskGroup: model.LabelValue(*task.Group),
ecsLabelTaskARN: model.LabelValue(*task.TaskArn),
ecsLabelTaskDefinition: model.LabelValue(*task.TaskDefinitionArn),
ecsLabelIPAddress: model.LabelValue(ipAddress),
ecsLabelRegion: model.LabelValue(d.cfg.Region),
ecsLabelLaunchType: model.LabelValue(task.LaunchType),
ecsLabelAvailabilityZone: model.LabelValue(*task.AvailabilityZone),
ecsLabelDesiredStatus: model.LabelValue(*task.DesiredStatus),
ecsLabelLastStatus: model.LabelValue(*task.LastStatus),
ecsLabelHealthStatus: model.LabelValue(task.HealthStatus),
ecsLabelNetworkMode: model.LabelValue(networkMode),
}
// Add subnet ID when available (awsvpc mode from ENI, bridge/host from EC2 instance)
if subnetID != "" {
labels[ecsLabelSubnetID] = model.LabelValue(subnetID)
}
// Add container instance and EC2 instance info for EC2 launch type
if task.ContainerInstanceArn != nil {
labels[ecsLabelContainerInstanceARN] = model.LabelValue(*task.ContainerInstanceArn)
}
if ec2InstanceID != "" {
labels[ecsLabelEC2InstanceID] = model.LabelValue(ec2InstanceID)
}
if ec2InstanceType != "" {
labels[ecsLabelEC2InstanceType] = model.LabelValue(ec2InstanceType)
}
if ec2InstancePrivateIP != "" {
labels[ecsLabelEC2InstancePrivateIP] = model.LabelValue(ec2InstancePrivateIP)
}
if ec2InstancePublicIP != "" {
labels[ecsLabelEC2InstancePublicIP] = model.LabelValue(ec2InstancePublicIP)
}
if publicIP != "" {
labels[ecsLabelPublicIP] = model.LabelValue(publicIP)
}
if task.PlatformFamily != nil {
labels[ecsLabelPlatformFamily] = model.LabelValue(*task.PlatformFamily)
}
if task.PlatformVersion != nil {
labels[ecsLabelPlatformVersion] = model.LabelValue(*task.PlatformVersion)
}
labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(ipAddress, strconv.Itoa(d.cfg.Port)))
// Add cluster tags
for _, clusterTag := range cluster.Tags {
if clusterTag.Key != nil && clusterTag.Value != nil {
labels[model.LabelName(ecsLabelTagCluster+strutil.SanitizeLabelName(*clusterTag.Key))] = model.LabelValue(*clusterTag.Value)
}
}
// If this is not a standalone task, add service information and tags
if !isStandaloneTask(task) {
service, ok := services[getServiceNameFromTaskGroup(task)]
if !ok {
d.logger.Debug("Service not found for task", "task", *task.TaskArn, "service", getServiceNameFromTaskGroup(task))
}
if service.ServiceName != nil {
labels[ecsLabelService] = model.LabelValue(*service.ServiceName)
}
if service.ServiceArn != nil {
labels[ecsLabelServiceARN] = model.LabelValue(*service.ServiceArn)
}
if service.Status != nil {
labels[ecsLabelServiceStatus] = model.LabelValue(*service.Status)
}
// Add service tags
for _, serviceTag := range service.Tags {
if serviceTag.Key != nil && serviceTag.Value != nil {
labels[model.LabelName(ecsLabelTagService+strutil.SanitizeLabelName(*serviceTag.Key))] = model.LabelValue(*serviceTag.Value)
}
}
}
// Add task tags
for _, taskTag := range task.Tags {
if taskTag.Key != nil && taskTag.Value != nil {
labels[model.LabelName(ecsLabelTagTask+strutil.SanitizeLabelName(*taskTag.Key))] = model.LabelValue(*taskTag.Value)
}
}
// Add EC2 instance tags (if running on EC2)
if ec2InstanceID != "" {
if info, ok := ec2Instances[ec2InstanceID]; ok {
for tagKey, tagValue := range info.tags {
labels[model.LabelName(ecsLabelTagEC2+strutil.SanitizeLabelName(tagKey))] = model.LabelValue(tagValue)
}
}
}
taskMu.Lock()
taskTargets = append(taskTargets, labels)
taskMu.Unlock()
}(cluster, services, task, containerInstances, ec2Instances, eniToPublicIP)
}
taskWg.Wait()
// Add this cluster's task targets to the overall collection
clusterMu.Lock()
clusterTargets = append(clusterTargets, taskTargets...)
clusterMu.Unlock()
}(clusterMap[clusterARN], serviceMap[clusterARN], taskARNs)
}
clusterWg.Wait()
// Set all targets to the target group
tg.Targets = clusterTargets
return []*targetgroup.Group{tg}, nil
}
func isStandaloneTask(task types.Task) bool {
// A standalone task will have a group of "family:task-def-name"
return task.Group != nil && strings.HasPrefix(*task.Group, "family:")
}
func getServiceNameFromTaskGroup(task types.Task) string {
return strings.Split(*task.Group, ":")[1]
}