mirror of
https://github.com/prometheus/prometheus.git
synced 2026-03-30 02:41:56 +02:00
996 lines
33 KiB
Go
996 lines
33 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package aws
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
|
|
"github.com/aws/aws-sdk-go-v2/service/ec2"
|
|
"github.com/aws/aws-sdk-go-v2/service/ecs"
|
|
"github.com/aws/aws-sdk-go-v2/service/ecs/types"
|
|
"github.com/aws/aws-sdk-go-v2/service/sts"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/prometheus/prometheus/discovery"
|
|
"github.com/prometheus/prometheus/discovery/refresh"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/util/strutil"
|
|
)
|
|
|
|
const (
|
|
ecsLabel = model.MetaLabelPrefix + "ecs_"
|
|
ecsLabelCluster = ecsLabel + "cluster"
|
|
ecsLabelClusterARN = ecsLabel + "cluster_arn"
|
|
ecsLabelService = ecsLabel + "service"
|
|
ecsLabelServiceARN = ecsLabel + "service_arn"
|
|
ecsLabelServiceStatus = ecsLabel + "service_status"
|
|
ecsLabelTaskGroup = ecsLabel + "task_group"
|
|
ecsLabelTaskARN = ecsLabel + "task_arn"
|
|
ecsLabelTaskDefinition = ecsLabel + "task_definition"
|
|
ecsLabelRegion = ecsLabel + "region"
|
|
ecsLabelAvailabilityZone = ecsLabel + "availability_zone"
|
|
ecsLabelSubnetID = ecsLabel + "subnet_id"
|
|
ecsLabelIPAddress = ecsLabel + "ip_address"
|
|
ecsLabelLaunchType = ecsLabel + "launch_type"
|
|
ecsLabelDesiredStatus = ecsLabel + "desired_status"
|
|
ecsLabelLastStatus = ecsLabel + "last_status"
|
|
ecsLabelHealthStatus = ecsLabel + "health_status"
|
|
ecsLabelPlatformFamily = ecsLabel + "platform_family"
|
|
ecsLabelPlatformVersion = ecsLabel + "platform_version"
|
|
ecsLabelTag = ecsLabel + "tag_"
|
|
ecsLabelTagCluster = ecsLabelTag + "cluster_"
|
|
ecsLabelTagService = ecsLabelTag + "service_"
|
|
ecsLabelTagTask = ecsLabelTag + "task_"
|
|
ecsLabelTagEC2 = ecsLabelTag + "ec2_"
|
|
ecsLabelNetworkMode = ecsLabel + "network_mode"
|
|
ecsLabelContainerInstanceARN = ecsLabel + "container_instance_arn"
|
|
ecsLabelEC2InstanceID = ecsLabel + "ec2_instance_id"
|
|
ecsLabelEC2InstanceType = ecsLabel + "ec2_instance_type"
|
|
ecsLabelEC2InstancePrivateIP = ecsLabel + "ec2_instance_private_ip"
|
|
ecsLabelEC2InstancePublicIP = ecsLabel + "ec2_instance_public_ip"
|
|
ecsLabelPublicIP = ecsLabel + "public_ip"
|
|
)
|
|
|
|
// DefaultECSSDConfig is the default ECS SD configuration.
|
|
var DefaultECSSDConfig = ECSSDConfig{
|
|
Port: 80,
|
|
RefreshInterval: model.Duration(60 * time.Second),
|
|
RequestConcurrency: 20, // Aligned with AWS ECS API sustained rate limits (20 req/sec)
|
|
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
|
}
|
|
|
|
func init() {
|
|
discovery.RegisterConfig(&ECSSDConfig{})
|
|
}
|
|
|
|
// ECSSDConfig is the configuration for ECS based service discovery.
|
|
type ECSSDConfig struct {
|
|
Region string `yaml:"region"`
|
|
Endpoint string `yaml:"endpoint"`
|
|
AccessKey string `yaml:"access_key,omitempty"`
|
|
SecretKey config.Secret `yaml:"secret_key,omitempty"`
|
|
Profile string `yaml:"profile,omitempty"`
|
|
RoleARN string `yaml:"role_arn,omitempty"`
|
|
Clusters []string `yaml:"clusters,omitempty"`
|
|
Port int `yaml:"port"`
|
|
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
|
|
|
|
// RequestConcurrency controls the maximum number of concurrent ECS API requests.
|
|
// Default is 20, which aligns with AWS ECS sustained rate limits:
|
|
// - Cluster read actions (DescribeClusters, ListClusters): 20 req/sec sustained
|
|
// - Service read actions (DescribeServices, ListServices): 20 req/sec sustained
|
|
// - Cluster resource read actions (DescribeTasks, ListTasks): 20 req/sec sustained
|
|
// See: https://docs.aws.amazon.com/AmazonECS/latest/APIReference/request-throttling.html
|
|
RequestConcurrency int `yaml:"request_concurrency,omitempty"`
|
|
|
|
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
|
|
}
|
|
|
|
// NewDiscovererMetrics implements discovery.Config.
|
|
func (*ECSSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
|
return &ecsMetrics{
|
|
refreshMetrics: rmi,
|
|
}
|
|
}
|
|
|
|
// Name returns the name of the ECS Config.
|
|
func (*ECSSDConfig) Name() string { return "ecs" }
|
|
|
|
// NewDiscoverer returns a Discoverer for the EC2 Config.
|
|
func (c *ECSSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
|
|
return NewECSDiscovery(c, opts)
|
|
}
|
|
|
|
// UnmarshalYAML implements the yaml.Unmarshaler interface for the ECS Config.
|
|
func (c *ECSSDConfig) UnmarshalYAML(unmarshal func(any) error) error {
|
|
*c = DefaultECSSDConfig
|
|
type plain ECSSDConfig
|
|
err := unmarshal((*plain)(c))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.Region, err = loadRegion(context.Background(), c.Region)
|
|
if err != nil {
|
|
return fmt.Errorf("could not determine AWS region: %w", err)
|
|
}
|
|
|
|
return c.HTTPClientConfig.Validate()
|
|
}
|
|
|
|
type ecsClient interface {
|
|
ListClusters(context.Context, *ecs.ListClustersInput, ...func(*ecs.Options)) (*ecs.ListClustersOutput, error)
|
|
DescribeClusters(context.Context, *ecs.DescribeClustersInput, ...func(*ecs.Options)) (*ecs.DescribeClustersOutput, error)
|
|
ListServices(context.Context, *ecs.ListServicesInput, ...func(*ecs.Options)) (*ecs.ListServicesOutput, error)
|
|
DescribeServices(context.Context, *ecs.DescribeServicesInput, ...func(*ecs.Options)) (*ecs.DescribeServicesOutput, error)
|
|
ListTasks(context.Context, *ecs.ListTasksInput, ...func(*ecs.Options)) (*ecs.ListTasksOutput, error)
|
|
DescribeTasks(context.Context, *ecs.DescribeTasksInput, ...func(*ecs.Options)) (*ecs.DescribeTasksOutput, error)
|
|
DescribeContainerInstances(context.Context, *ecs.DescribeContainerInstancesInput, ...func(*ecs.Options)) (*ecs.DescribeContainerInstancesOutput, error)
|
|
}
|
|
|
|
type ecsEC2Client interface {
|
|
DescribeInstances(context.Context, *ec2.DescribeInstancesInput, ...func(*ec2.Options)) (*ec2.DescribeInstancesOutput, error)
|
|
DescribeNetworkInterfaces(context.Context, *ec2.DescribeNetworkInterfacesInput, ...func(*ec2.Options)) (*ec2.DescribeNetworkInterfacesOutput, error)
|
|
}
|
|
|
|
// ECSDiscovery periodically performs ECS-SD requests. It implements
|
|
// the Discoverer interface.
|
|
type ECSDiscovery struct {
|
|
*refresh.Discovery
|
|
logger *slog.Logger
|
|
cfg *ECSSDConfig
|
|
ecs ecsClient
|
|
ec2 ecsEC2Client
|
|
}
|
|
|
|
// NewECSDiscovery returns a new ECSDiscovery which periodically refreshes its targets.
|
|
func NewECSDiscovery(conf *ECSSDConfig, opts discovery.DiscovererOptions) (*ECSDiscovery, error) {
|
|
m, ok := opts.Metrics.(*ecsMetrics)
|
|
if !ok {
|
|
return nil, errors.New("invalid discovery metrics type")
|
|
}
|
|
|
|
if opts.Logger == nil {
|
|
opts.Logger = promslog.NewNopLogger()
|
|
}
|
|
d := &ECSDiscovery{
|
|
logger: opts.Logger,
|
|
cfg: conf,
|
|
}
|
|
d.Discovery = refresh.NewDiscovery(
|
|
refresh.Options{
|
|
Logger: opts.Logger,
|
|
Mech: "ecs",
|
|
Interval: time.Duration(d.cfg.RefreshInterval),
|
|
RefreshF: d.refresh,
|
|
MetricsInstantiator: m.refreshMetrics,
|
|
},
|
|
)
|
|
return d, nil
|
|
}
|
|
|
|
func (d *ECSDiscovery) initEcsClient(ctx context.Context) error {
|
|
if d.ecs != nil && d.ec2 != nil {
|
|
return nil
|
|
}
|
|
|
|
if d.cfg.Region == "" {
|
|
return errors.New("region must be set for ECS service discovery")
|
|
}
|
|
|
|
// Build the HTTP client from the provided HTTPClientConfig.
|
|
client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "ecs_sd")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Build the AWS config with the provided region.
|
|
var configOptions []func(*awsConfig.LoadOptions) error
|
|
configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region))
|
|
configOptions = append(configOptions, awsConfig.WithHTTPClient(client))
|
|
|
|
// Only set static credentials if both access key and secret key are provided
|
|
// Otherwise, let AWS SDK use its default credential chain
|
|
if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" {
|
|
credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "")
|
|
configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider))
|
|
}
|
|
|
|
if d.cfg.Profile != "" {
|
|
configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile))
|
|
}
|
|
|
|
cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...)
|
|
if err != nil {
|
|
d.logger.Error("Failed to create AWS config", "error", err)
|
|
return fmt.Errorf("could not create aws config: %w", err)
|
|
}
|
|
|
|
// If the role ARN is set, assume the role to get credentials and set the credentials provider in the config.
|
|
if d.cfg.RoleARN != "" {
|
|
assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN)
|
|
cfg.Credentials = aws.NewCredentialsCache(assumeProvider)
|
|
}
|
|
|
|
d.ecs = ecs.NewFromConfig(cfg, func(options *ecs.Options) {
|
|
if d.cfg.Endpoint != "" {
|
|
options.BaseEndpoint = &d.cfg.Endpoint
|
|
}
|
|
options.HTTPClient = client
|
|
})
|
|
|
|
d.ec2 = ec2.NewFromConfig(cfg, func(options *ec2.Options) {
|
|
options.HTTPClient = client
|
|
})
|
|
|
|
// Test credentials by making a simple API call
|
|
testCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err = d.ecs.DescribeClusters(testCtx, &ecs.DescribeClustersInput{})
|
|
if err != nil {
|
|
d.logger.Error("Failed to test ECS credentials", "error", err)
|
|
return fmt.Errorf("ECS credential test failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// listClusterARNs returns a slice of cluster arns.
|
|
// This method does not use concurrency as it's a simple paginated call.
|
|
func (d *ECSDiscovery) listClusterARNs(ctx context.Context) ([]string, error) {
|
|
var (
|
|
clusterARNs []string
|
|
nextToken *string
|
|
)
|
|
for {
|
|
resp, err := d.ecs.ListClusters(ctx, &ecs.ListClustersInput{
|
|
NextToken: nextToken,
|
|
MaxResults: aws.Int32(100),
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not list clusters: %w", err)
|
|
}
|
|
|
|
clusterARNs = append(clusterARNs, resp.ClusterArns...)
|
|
|
|
if resp.NextToken == nil {
|
|
break
|
|
}
|
|
nextToken = resp.NextToken
|
|
}
|
|
|
|
return clusterARNs, nil
|
|
}
|
|
|
|
// describeClusters returns a map of cluster ARN to a slice of clusters.
|
|
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
|
|
// Clusters are described in batches of 100 to respect AWS API limits (DescribeClusters allows up to 100 clusters per call).
|
|
func (d *ECSDiscovery) describeClusters(ctx context.Context, clusters []string) (map[string]types.Cluster, error) {
|
|
mu := sync.Mutex{}
|
|
clusterMap := make(map[string]types.Cluster)
|
|
errg, ectx := errgroup.WithContext(ctx)
|
|
errg.SetLimit(d.cfg.RequestConcurrency)
|
|
for batch := range slices.Chunk(clusters, 100) {
|
|
errg.Go(func() error {
|
|
resp, err := d.ecs.DescribeClusters(ectx, &ecs.DescribeClustersInput{
|
|
Clusters: batch,
|
|
Include: []types.ClusterField{"TAGS"},
|
|
})
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe clusters", "clusters", batch, "error", err)
|
|
return fmt.Errorf("could not describe clusters %v: %w", batch, err)
|
|
}
|
|
|
|
for _, cluster := range resp.Clusters {
|
|
if cluster.ClusterArn != nil {
|
|
mu.Lock()
|
|
clusterMap[*cluster.ClusterArn] = cluster
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return clusterMap, errg.Wait()
|
|
}
|
|
|
|
// listServiceARNs returns a map of cluster ARN to a slice of service ARNs.
|
|
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
|
|
// Services are listed in batches of 100 to respect AWS API limits (ListServices allows up to 100 services per call).
|
|
func (d *ECSDiscovery) listServiceARNs(ctx context.Context, clusters []string) (map[string][]string, error) {
|
|
mu := sync.Mutex{}
|
|
services := make(map[string][]string)
|
|
errg, ectx := errgroup.WithContext(ctx)
|
|
errg.SetLimit(d.cfg.RequestConcurrency)
|
|
for _, clusterARN := range clusters {
|
|
errg.Go(func() error {
|
|
var nextToken *string
|
|
var serviceARNs []string
|
|
for {
|
|
resp, err := d.ecs.ListServices(ectx, &ecs.ListServicesInput{
|
|
Cluster: aws.String(clusterARN),
|
|
NextToken: nextToken,
|
|
MaxResults: aws.Int32(100),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not list services for cluster %q: %w", clusterARN, err)
|
|
}
|
|
|
|
serviceARNs = append(serviceARNs, resp.ServiceArns...)
|
|
|
|
if resp.NextToken == nil {
|
|
break
|
|
}
|
|
nextToken = resp.NextToken
|
|
}
|
|
|
|
mu.Lock()
|
|
services[clusterARN] = serviceARNs
|
|
mu.Unlock()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return services, errg.Wait()
|
|
}
|
|
|
|
// describeServices returns a map of service name to service.
|
|
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
|
|
// Services are described in batches of 10 to respect AWS API limits (DescribeServices allows up to 10 services per call).
|
|
func (d *ECSDiscovery) describeServices(ctx context.Context, clusterARN string, serviceARNS []string) (map[string]types.Service, error) {
|
|
mu := sync.Mutex{}
|
|
services := make(map[string]types.Service)
|
|
errg, ectx := errgroup.WithContext(ctx)
|
|
errg.SetLimit(d.cfg.RequestConcurrency)
|
|
for batch := range slices.Chunk(serviceARNS, 10) {
|
|
errg.Go(func() error {
|
|
resp, err := d.ecs.DescribeServices(ectx, &ecs.DescribeServicesInput{
|
|
Cluster: aws.String(clusterARN),
|
|
Services: batch,
|
|
Include: []types.ServiceField{"TAGS"},
|
|
})
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe services", "cluster", clusterARN, "batch", batch, "error", err)
|
|
return fmt.Errorf("could not describe services for cluster %q: batch %v: %w", clusterARN, batch, err)
|
|
}
|
|
|
|
for _, service := range resp.Services {
|
|
if service.ServiceArn != nil {
|
|
mu.Lock()
|
|
services[*service.ServiceName] = service
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return services, errg.Wait()
|
|
}
|
|
|
|
// listTaskARNs returns a map of clustersARN to a slice of task ARNs.
|
|
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
|
|
// Tasks are listed in batches of 100 to respect AWS API limits (ListTasks allows up to 100 tasks per call).
|
|
// This method also uses pagination to handle cases where there are more than 100 tasks in a cluster.
|
|
func (d *ECSDiscovery) listTaskARNs(ctx context.Context, clusterARNs []string) (map[string][]string, error) {
|
|
mu := sync.Mutex{}
|
|
tasks := make(map[string][]string)
|
|
errg, ectx := errgroup.WithContext(ctx)
|
|
errg.SetLimit(d.cfg.RequestConcurrency)
|
|
for _, clusterARN := range clusterARNs {
|
|
errg.Go(func() error {
|
|
var (
|
|
nextToken *string
|
|
taskARNs []string
|
|
)
|
|
for {
|
|
resp, err := d.ecs.ListTasks(ectx, &ecs.ListTasksInput{
|
|
Cluster: aws.String(clusterARN),
|
|
NextToken: nextToken,
|
|
MaxResults: aws.Int32(100),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not list tasks for cluster %q: %w", clusterARN, err)
|
|
}
|
|
|
|
taskARNs = append(taskARNs, resp.TaskArns...)
|
|
|
|
if resp.NextToken == nil {
|
|
break
|
|
}
|
|
nextToken = resp.NextToken
|
|
}
|
|
|
|
mu.Lock()
|
|
tasks[clusterARN] = taskARNs
|
|
mu.Unlock()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return tasks, errg.Wait()
|
|
}
|
|
|
|
// describeTasks returns a slice of tasks.
|
|
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
|
|
// Tasks are described in batches of 100 to respect AWS API limits (DescribeTasks allows up to 100 tasks per call).
|
|
func (d *ECSDiscovery) describeTasks(ctx context.Context, clusterARN string, taskARNs []string) ([]types.Task, error) {
|
|
mu := sync.Mutex{}
|
|
var tasks []types.Task
|
|
errg, ectx := errgroup.WithContext(ctx)
|
|
errg.SetLimit(d.cfg.RequestConcurrency)
|
|
for batch := range slices.Chunk(taskARNs, 100) {
|
|
errg.Go(func() error {
|
|
resp, err := d.ecs.DescribeTasks(ectx, &ecs.DescribeTasksInput{
|
|
Cluster: aws.String(clusterARN),
|
|
Tasks: batch,
|
|
Include: []types.TaskField{"TAGS"},
|
|
})
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe tasks", "cluster", clusterARN, "batch", batch, "error", err)
|
|
return fmt.Errorf("could not describe tasks in cluster %q: batch %v: %w", clusterARN, batch, err)
|
|
}
|
|
|
|
mu.Lock()
|
|
tasks = append(tasks, resp.Tasks...)
|
|
mu.Unlock()
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return tasks, errg.Wait()
|
|
}
|
|
|
|
// describeContainerInstances returns a map of container instance ARN to EC2 instance ID
|
|
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
|
|
// Container instances are described in batches of 100 to respect AWS API limits (DescribeContainerInstances allows up to 100 container instances per call).
|
|
func (d *ECSDiscovery) describeContainerInstances(ctx context.Context, clusterARN string, tasks []types.Task) (map[string]string, error) {
|
|
containerInstanceARNs := make([]string, 0, len(tasks))
|
|
for _, task := range tasks {
|
|
if task.ContainerInstanceArn != nil {
|
|
containerInstanceARNs = append(containerInstanceARNs, *task.ContainerInstanceArn)
|
|
}
|
|
}
|
|
|
|
if len(containerInstanceARNs) == 0 {
|
|
return make(map[string]string), nil
|
|
}
|
|
|
|
mu := sync.Mutex{}
|
|
containerInstToEC2 := make(map[string]string)
|
|
errg, ectx := errgroup.WithContext(ctx)
|
|
errg.SetLimit(d.cfg.RequestConcurrency)
|
|
for batch := range slices.Chunk(containerInstanceARNs, 100) {
|
|
errg.Go(func() error {
|
|
resp, err := d.ecs.DescribeContainerInstances(ectx, &ecs.DescribeContainerInstancesInput{
|
|
Cluster: aws.String(clusterARN),
|
|
ContainerInstances: batch,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not describe container instances: %w", err)
|
|
}
|
|
|
|
for _, ci := range resp.ContainerInstances {
|
|
if ci.ContainerInstanceArn != nil && ci.Ec2InstanceId != nil {
|
|
mu.Lock()
|
|
containerInstToEC2[*ci.ContainerInstanceArn] = *ci.Ec2InstanceId
|
|
mu.Unlock()
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
return containerInstToEC2, errg.Wait()
|
|
}
|
|
|
|
// ec2InstanceInfo holds information retrieved from EC2 DescribeInstances.
|
|
type ec2InstanceInfo struct {
|
|
privateIP string
|
|
publicIP string
|
|
subnetID string
|
|
instanceType string
|
|
tags map[string]string
|
|
}
|
|
|
|
// describeEC2Instances returns a map of EC2 instance ID to instance information.
|
|
// Uses concurrent requests limited by RequestConcurrency to respect AWS API throttling.
|
|
// This method does not use concurrency as it's a simple paginated call.
|
|
func (d *ECSDiscovery) describeEC2Instances(ctx context.Context, instanceIDs []string) (map[string]ec2InstanceInfo, error) {
|
|
if len(instanceIDs) == 0 {
|
|
return make(map[string]ec2InstanceInfo), nil
|
|
}
|
|
|
|
instanceInfo := make(map[string]ec2InstanceInfo)
|
|
var nextToken *string
|
|
|
|
for {
|
|
resp, err := d.ec2.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
|
|
InstanceIds: instanceIDs,
|
|
NextToken: nextToken,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not describe EC2 instances: %w", err)
|
|
}
|
|
|
|
for _, reservation := range resp.Reservations {
|
|
for _, instance := range reservation.Instances {
|
|
if instance.InstanceId != nil && instance.PrivateIpAddress != nil {
|
|
info := ec2InstanceInfo{
|
|
privateIP: *instance.PrivateIpAddress,
|
|
tags: make(map[string]string),
|
|
}
|
|
if instance.PublicIpAddress != nil {
|
|
info.publicIP = *instance.PublicIpAddress
|
|
}
|
|
if instance.SubnetId != nil {
|
|
info.subnetID = *instance.SubnetId
|
|
}
|
|
if instance.InstanceType != "" {
|
|
info.instanceType = string(instance.InstanceType)
|
|
}
|
|
// Collect EC2 instance tags
|
|
for _, tag := range instance.Tags {
|
|
if tag.Key != nil && tag.Value != nil {
|
|
info.tags[*tag.Key] = *tag.Value
|
|
}
|
|
}
|
|
instanceInfo[*instance.InstanceId] = info
|
|
}
|
|
}
|
|
}
|
|
|
|
if resp.NextToken == nil {
|
|
break
|
|
}
|
|
nextToken = resp.NextToken
|
|
}
|
|
|
|
return instanceInfo, nil
|
|
}
|
|
|
|
// describeNetworkInterfaces returns a map of ENI ID to public IP address.
|
|
// This is needed to get the public IP for tasks using awsvpc network mode, as the ENI is what gets the public IP, not the EC2 instance.
|
|
// This method does not use concurrency as it's a simple paginated call.
|
|
func (d *ECSDiscovery) describeNetworkInterfaces(ctx context.Context, tasks []types.Task) (map[string]string, error) {
|
|
eniIDs := make([]string, 0, len(tasks))
|
|
|
|
for _, task := range tasks {
|
|
for _, attachment := range task.Attachments {
|
|
if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" {
|
|
for _, detail := range attachment.Details {
|
|
if detail.Name != nil && *detail.Name == "networkInterfaceId" && detail.Value != nil {
|
|
eniIDs = append(eniIDs, *detail.Value)
|
|
break
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(eniIDs) == 0 {
|
|
return make(map[string]string), nil
|
|
}
|
|
|
|
eniToPublicIP := make(map[string]string)
|
|
var nextToken *string
|
|
|
|
for {
|
|
resp, err := d.ec2.DescribeNetworkInterfaces(ctx, &ec2.DescribeNetworkInterfacesInput{
|
|
NetworkInterfaceIds: eniIDs,
|
|
NextToken: nextToken,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not describe network interfaces: %w", err)
|
|
}
|
|
|
|
for _, eni := range resp.NetworkInterfaces {
|
|
if eni.NetworkInterfaceId != nil && eni.Association != nil && eni.Association.PublicIp != nil {
|
|
eniToPublicIP[*eni.NetworkInterfaceId] = *eni.Association.PublicIp
|
|
}
|
|
}
|
|
|
|
if resp.NextToken == nil {
|
|
break
|
|
}
|
|
nextToken = resp.NextToken
|
|
}
|
|
|
|
return eniToPublicIP, nil
|
|
}
|
|
|
|
func (d *ECSDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
|
err := d.initEcsClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var clusters []string
|
|
if len(d.cfg.Clusters) == 0 {
|
|
clusters, err = d.listClusterARNs(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
clusters = d.cfg.Clusters
|
|
}
|
|
|
|
if len(clusters) == 0 {
|
|
return []*targetgroup.Group{
|
|
{
|
|
Source: d.cfg.Region,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
tg := &targetgroup.Group{
|
|
Source: d.cfg.Region,
|
|
}
|
|
|
|
// Fetch cluster details, service ARNs, and task ARNs in parallel
|
|
var (
|
|
clusterMap map[string]types.Cluster
|
|
serviceMap map[string][]string
|
|
taskMap map[string][]string
|
|
)
|
|
|
|
clusterErrg, clusterCtx := errgroup.WithContext(ctx)
|
|
clusterErrg.Go(func() error {
|
|
var err error
|
|
clusterMap, err = d.describeClusters(clusterCtx, clusters)
|
|
return err
|
|
})
|
|
clusterErrg.Go(func() error {
|
|
var err error
|
|
serviceMap, err = d.listServiceARNs(clusterCtx, clusters)
|
|
return err
|
|
})
|
|
clusterErrg.Go(func() error {
|
|
var err error
|
|
taskMap, err = d.listTaskARNs(clusterCtx, clusters)
|
|
return err
|
|
})
|
|
|
|
if err := clusterErrg.Wait(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Use goroutines to process clusters in parallel
|
|
var (
|
|
clusterWg sync.WaitGroup
|
|
clusterMu sync.Mutex
|
|
clusterTargets []model.LabelSet
|
|
)
|
|
|
|
for clusterARN, taskARNs := range taskMap {
|
|
if len(taskARNs) == 0 {
|
|
continue
|
|
}
|
|
|
|
clusterWg.Add(1)
|
|
|
|
go func(cluster types.Cluster, serviceARNs, taskARNs []string) {
|
|
defer clusterWg.Done()
|
|
|
|
// Fetch services and tasks in parallel (they're independent)
|
|
var (
|
|
services map[string]types.Service
|
|
tasks []types.Task
|
|
)
|
|
|
|
resourceErrg, resourceCtx := errgroup.WithContext(ctx)
|
|
resourceErrg.Go(func() error {
|
|
var err error
|
|
services, err = d.describeServices(resourceCtx, *cluster.ClusterArn, serviceARNs)
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe services for cluster", "cluster", *cluster.ClusterArn, "error", err)
|
|
}
|
|
return err
|
|
})
|
|
resourceErrg.Go(func() error {
|
|
var err error
|
|
tasks, err = d.describeTasks(resourceCtx, *cluster.ClusterArn, taskARNs)
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe tasks for cluster", "cluster", *cluster.ClusterArn, "error", err)
|
|
}
|
|
return err
|
|
})
|
|
|
|
if err := resourceErrg.Wait(); err != nil {
|
|
return
|
|
}
|
|
|
|
// Fetch container instances and network interfaces in parallel (both depend on tasks)
|
|
var (
|
|
containerInstances map[string]string
|
|
eniToPublicIP map[string]string
|
|
)
|
|
|
|
instanceErrg, instanceCtx := errgroup.WithContext(ctx)
|
|
instanceErrg.Go(func() error {
|
|
var err error
|
|
containerInstances, err = d.describeContainerInstances(instanceCtx, *cluster.ClusterArn, tasks)
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe container instances for cluster", "cluster", *cluster.ClusterArn, "error", err)
|
|
}
|
|
return err
|
|
})
|
|
instanceErrg.Go(func() error {
|
|
var err error
|
|
eniToPublicIP, err = d.describeNetworkInterfaces(instanceCtx, tasks)
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe network interfaces for cluster", "cluster", *cluster.ClusterArn, "error", err)
|
|
}
|
|
return err
|
|
})
|
|
|
|
if err := instanceErrg.Wait(); err != nil {
|
|
return
|
|
}
|
|
|
|
ec2Instances := make(map[string]ec2InstanceInfo)
|
|
if len(containerInstances) > 0 {
|
|
// Deduplicate EC2 instance IDs (multiple tasks can share the same instance)
|
|
ec2InstanceIDSet := make(map[string]struct{})
|
|
for _, ec2ID := range containerInstances {
|
|
ec2InstanceIDSet[ec2ID] = struct{}{}
|
|
}
|
|
ec2InstanceIDs := make([]string, 0, len(ec2InstanceIDSet))
|
|
for ec2ID := range ec2InstanceIDSet {
|
|
ec2InstanceIDs = append(ec2InstanceIDs, ec2ID)
|
|
}
|
|
ec2Instances, err = d.describeEC2Instances(ctx, ec2InstanceIDs)
|
|
if err != nil {
|
|
d.logger.Error("Failed to describe EC2 instances for cluster", "cluster", *cluster.ClusterArn, "error", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
var (
|
|
taskWg sync.WaitGroup
|
|
taskMu sync.Mutex
|
|
taskTargets []model.LabelSet
|
|
)
|
|
|
|
for _, task := range tasks {
|
|
taskWg.Add(1)
|
|
|
|
go func(cluster types.Cluster, services map[string]types.Service, task types.Task, containerInstances map[string]string, ec2Instances map[string]ec2InstanceInfo, eniToPublicIP map[string]string) {
|
|
defer taskWg.Done()
|
|
|
|
var (
|
|
ipAddress, subnetID, publicIP string
|
|
networkMode string
|
|
ec2InstanceID, ec2InstanceType, ec2InstancePrivateIP, ec2InstancePublicIP string
|
|
)
|
|
|
|
// Try to get IP from ENI attachment (awsvpc mode)
|
|
var eniAttachment *types.Attachment
|
|
for _, attachment := range task.Attachments {
|
|
if attachment.Type != nil && *attachment.Type == "ElasticNetworkInterface" {
|
|
eniAttachment = &attachment
|
|
break
|
|
}
|
|
}
|
|
|
|
if eniAttachment != nil {
|
|
// awsvpc networking mode - get IP from ENI
|
|
networkMode = "awsvpc"
|
|
var eniID string
|
|
for _, detail := range eniAttachment.Details {
|
|
switch *detail.Name {
|
|
case "privateIPv4Address":
|
|
ipAddress = *detail.Value
|
|
case "subnetId":
|
|
subnetID = *detail.Value
|
|
case "networkInterfaceId":
|
|
eniID = *detail.Value
|
|
}
|
|
}
|
|
// Get public IP from ENI if available
|
|
if eniID != "" {
|
|
if pub, ok := eniToPublicIP[eniID]; ok {
|
|
publicIP = pub
|
|
}
|
|
}
|
|
} else if task.ContainerInstanceArn != nil {
|
|
// bridge/host networking mode - need to get EC2 instance IP and subnet
|
|
networkMode = "bridge"
|
|
var ok bool
|
|
ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn]
|
|
if ok {
|
|
info, ok := ec2Instances[ec2InstanceID]
|
|
if ok {
|
|
ipAddress = info.privateIP
|
|
publicIP = info.publicIP
|
|
subnetID = info.subnetID
|
|
ec2InstanceType = info.instanceType
|
|
ec2InstancePrivateIP = info.privateIP
|
|
ec2InstancePublicIP = info.publicIP
|
|
} else {
|
|
d.logger.Debug("EC2 instance info not found", "instance", ec2InstanceID, "task", *task.TaskArn)
|
|
}
|
|
} else {
|
|
d.logger.Debug("Container instance not found in map", "arn", *task.ContainerInstanceArn, "task", *task.TaskArn)
|
|
}
|
|
}
|
|
|
|
// Get EC2 instance metadata for awsvpc tasks running on EC2
|
|
// We want the instance type and the host IPs for advanced use cases
|
|
if networkMode == "awsvpc" && task.ContainerInstanceArn != nil {
|
|
var ok bool
|
|
ec2InstanceID, ok = containerInstances[*task.ContainerInstanceArn]
|
|
if ok {
|
|
info, ok := ec2Instances[ec2InstanceID]
|
|
if ok {
|
|
ec2InstanceType = info.instanceType
|
|
ec2InstancePrivateIP = info.privateIP
|
|
ec2InstancePublicIP = info.publicIP
|
|
}
|
|
}
|
|
}
|
|
|
|
if ipAddress == "" {
|
|
return
|
|
}
|
|
|
|
labels := model.LabelSet{
|
|
ecsLabelClusterARN: model.LabelValue(*cluster.ClusterArn),
|
|
ecsLabelCluster: model.LabelValue(*cluster.ClusterName),
|
|
ecsLabelTaskGroup: model.LabelValue(*task.Group),
|
|
ecsLabelTaskARN: model.LabelValue(*task.TaskArn),
|
|
ecsLabelTaskDefinition: model.LabelValue(*task.TaskDefinitionArn),
|
|
ecsLabelIPAddress: model.LabelValue(ipAddress),
|
|
ecsLabelRegion: model.LabelValue(d.cfg.Region),
|
|
ecsLabelLaunchType: model.LabelValue(task.LaunchType),
|
|
ecsLabelAvailabilityZone: model.LabelValue(*task.AvailabilityZone),
|
|
ecsLabelDesiredStatus: model.LabelValue(*task.DesiredStatus),
|
|
ecsLabelLastStatus: model.LabelValue(*task.LastStatus),
|
|
ecsLabelHealthStatus: model.LabelValue(task.HealthStatus),
|
|
ecsLabelNetworkMode: model.LabelValue(networkMode),
|
|
}
|
|
|
|
// Add subnet ID when available (awsvpc mode from ENI, bridge/host from EC2 instance)
|
|
if subnetID != "" {
|
|
labels[ecsLabelSubnetID] = model.LabelValue(subnetID)
|
|
}
|
|
|
|
// Add container instance and EC2 instance info for EC2 launch type
|
|
if task.ContainerInstanceArn != nil {
|
|
labels[ecsLabelContainerInstanceARN] = model.LabelValue(*task.ContainerInstanceArn)
|
|
}
|
|
if ec2InstanceID != "" {
|
|
labels[ecsLabelEC2InstanceID] = model.LabelValue(ec2InstanceID)
|
|
}
|
|
if ec2InstanceType != "" {
|
|
labels[ecsLabelEC2InstanceType] = model.LabelValue(ec2InstanceType)
|
|
}
|
|
if ec2InstancePrivateIP != "" {
|
|
labels[ecsLabelEC2InstancePrivateIP] = model.LabelValue(ec2InstancePrivateIP)
|
|
}
|
|
if ec2InstancePublicIP != "" {
|
|
labels[ecsLabelEC2InstancePublicIP] = model.LabelValue(ec2InstancePublicIP)
|
|
}
|
|
if publicIP != "" {
|
|
labels[ecsLabelPublicIP] = model.LabelValue(publicIP)
|
|
}
|
|
|
|
if task.PlatformFamily != nil {
|
|
labels[ecsLabelPlatformFamily] = model.LabelValue(*task.PlatformFamily)
|
|
}
|
|
if task.PlatformVersion != nil {
|
|
labels[ecsLabelPlatformVersion] = model.LabelValue(*task.PlatformVersion)
|
|
}
|
|
|
|
labels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(ipAddress, strconv.Itoa(d.cfg.Port)))
|
|
|
|
// Add cluster tags
|
|
for _, clusterTag := range cluster.Tags {
|
|
if clusterTag.Key != nil && clusterTag.Value != nil {
|
|
labels[model.LabelName(ecsLabelTagCluster+strutil.SanitizeLabelName(*clusterTag.Key))] = model.LabelValue(*clusterTag.Value)
|
|
}
|
|
}
|
|
|
|
// If this is not a standalone task, add service information and tags
|
|
if !isStandaloneTask(task) {
|
|
service, ok := services[getServiceNameFromTaskGroup(task)]
|
|
if !ok {
|
|
d.logger.Debug("Service not found for task", "task", *task.TaskArn, "service", getServiceNameFromTaskGroup(task))
|
|
}
|
|
if service.ServiceName != nil {
|
|
labels[ecsLabelService] = model.LabelValue(*service.ServiceName)
|
|
}
|
|
if service.ServiceArn != nil {
|
|
labels[ecsLabelServiceARN] = model.LabelValue(*service.ServiceArn)
|
|
}
|
|
if service.Status != nil {
|
|
labels[ecsLabelServiceStatus] = model.LabelValue(*service.Status)
|
|
}
|
|
|
|
// Add service tags
|
|
for _, serviceTag := range service.Tags {
|
|
if serviceTag.Key != nil && serviceTag.Value != nil {
|
|
labels[model.LabelName(ecsLabelTagService+strutil.SanitizeLabelName(*serviceTag.Key))] = model.LabelValue(*serviceTag.Value)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Add task tags
|
|
for _, taskTag := range task.Tags {
|
|
if taskTag.Key != nil && taskTag.Value != nil {
|
|
labels[model.LabelName(ecsLabelTagTask+strutil.SanitizeLabelName(*taskTag.Key))] = model.LabelValue(*taskTag.Value)
|
|
}
|
|
}
|
|
|
|
// Add EC2 instance tags (if running on EC2)
|
|
if ec2InstanceID != "" {
|
|
if info, ok := ec2Instances[ec2InstanceID]; ok {
|
|
for tagKey, tagValue := range info.tags {
|
|
labels[model.LabelName(ecsLabelTagEC2+strutil.SanitizeLabelName(tagKey))] = model.LabelValue(tagValue)
|
|
}
|
|
}
|
|
}
|
|
|
|
taskMu.Lock()
|
|
taskTargets = append(taskTargets, labels)
|
|
taskMu.Unlock()
|
|
}(cluster, services, task, containerInstances, ec2Instances, eniToPublicIP)
|
|
}
|
|
|
|
taskWg.Wait()
|
|
|
|
// Add this cluster's task targets to the overall collection
|
|
clusterMu.Lock()
|
|
clusterTargets = append(clusterTargets, taskTargets...)
|
|
clusterMu.Unlock()
|
|
}(clusterMap[clusterARN], serviceMap[clusterARN], taskARNs)
|
|
}
|
|
|
|
clusterWg.Wait()
|
|
|
|
// Set all targets to the target group
|
|
tg.Targets = clusterTargets
|
|
|
|
return []*targetgroup.Group{tg}, nil
|
|
}
|
|
|
|
func isStandaloneTask(task types.Task) bool {
|
|
// A standalone task will have a group of "family:task-def-name"
|
|
return task.Group != nil && strings.HasPrefix(*task.Group, "family:")
|
|
}
|
|
|
|
func getServiceNameFromTaskGroup(task types.Task) string {
|
|
return strings.Split(*task.Group, ":")[1]
|
|
}
|