mirror of
https://github.com/prometheus/prometheus.git
synced 2026-02-11 10:51:03 +01:00
Relates to https://github.com/prometheus/prometheus/issues/16944#issuecomment-3164760343 Signed-off-by: bwplotka <bwplotka@gmail.com> Signed-off-by: matt-gp <small_minority@hotmail.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
464 lines
15 KiB
Go
464 lines
15 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package aws
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
|
|
"github.com/aws/aws-sdk-go-v2/feature/ec2/imds"
|
|
"github.com/aws/aws-sdk-go-v2/service/kafka"
|
|
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
|
|
"github.com/aws/aws-sdk-go-v2/service/sts"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/config"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/prometheus/common/promslog"
|
|
|
|
"github.com/prometheus/prometheus/discovery"
|
|
"github.com/prometheus/prometheus/discovery/refresh"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
"github.com/prometheus/prometheus/util/strutil"
|
|
)
|
|
|
|
type NodeType string
|
|
|
|
const (
|
|
NodeTypeBroker NodeType = "BROKER"
|
|
NodeTypeController NodeType = "CONTROLLER"
|
|
)
|
|
|
|
const (
|
|
mskLabel = model.MetaLabelPrefix + "msk_"
|
|
|
|
// Cluster labels.
|
|
mskLabelCluster = mskLabel + "cluster_"
|
|
mskLabelClusterName = mskLabelCluster + "name"
|
|
mskLabelClusterARN = mskLabelCluster + "arn"
|
|
mskLabelClusterState = mskLabelCluster + "state"
|
|
mskLabelClusterType = mskLabelCluster + "type"
|
|
mskLabelClusterVersion = mskLabelCluster + "version"
|
|
mskLabelClusterJmxExporterEnabled = mskLabelCluster + "jmx_exporter_enabled"
|
|
mskLabelClusterConfigurationARN = mskLabelCluster + "configuration_arn"
|
|
mskLabelClusterConfigurationRevision = mskLabelCluster + "configuration_revision"
|
|
mskLabelClusterKafkaVersion = mskLabelCluster + "kafka_version"
|
|
mskLabelClusterTags = mskLabelCluster + "tag_"
|
|
|
|
// Node labels.
|
|
mskLabelNode = mskLabel + "node_"
|
|
mskLabelNodeType = mskLabelNode + "type"
|
|
mskLabelNodeARN = mskLabelNode + "arn"
|
|
mskLabelNodeAddedTime = mskLabelNode + "added_time"
|
|
mskLabelNodeInstanceType = mskLabelNode + "instance_type"
|
|
mskLabelNodeAttachedENI = mskLabelNode + "attached_eni"
|
|
|
|
// Broker labels.
|
|
mskLabelBroker = mskLabel + "broker_"
|
|
mskLabelBrokerEndpointIndex = mskLabelBroker + "endpoint_index"
|
|
mskLabelBrokerID = mskLabelBroker + "id"
|
|
mskLabelBrokerClientSubnet = mskLabelBroker + "client_subnet"
|
|
mskLabelBrokerClientVPCIP = mskLabelBroker + "client_vpc_ip"
|
|
mskLabelBrokerNodeExporterEnabled = mskLabelBroker + "node_exporter_enabled"
|
|
|
|
// Controller labels.
|
|
mskLabelController = mskLabel + "controller_"
|
|
mskLabelControllerEndpointIndex = mskLabelController + "endpoint_index"
|
|
)
|
|
|
|
// DefaultMSKSDConfig is the default MSK SD configuration.
|
|
var DefaultMSKSDConfig = MSKSDConfig{
|
|
Port: 80,
|
|
RefreshInterval: model.Duration(60 * time.Second),
|
|
HTTPClientConfig: config.DefaultHTTPClientConfig,
|
|
}
|
|
|
|
func init() {
|
|
discovery.RegisterConfig(&MSKSDConfig{})
|
|
}
|
|
|
|
// MSKSDConfig is the configuration for MSK based service discovery.
|
|
type MSKSDConfig struct {
|
|
Region string `yaml:"region"`
|
|
Endpoint string `yaml:"endpoint"`
|
|
AccessKey string `yaml:"access_key,omitempty"`
|
|
SecretKey config.Secret `yaml:"secret_key,omitempty"`
|
|
Profile string `yaml:"profile,omitempty"`
|
|
RoleARN string `yaml:"role_arn,omitempty"`
|
|
Clusters []string `yaml:"clusters,omitempty"`
|
|
Port int `yaml:"port"`
|
|
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
|
|
|
|
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
|
|
}
|
|
|
|
// NewDiscovererMetrics implements discovery.Config.
|
|
func (*MSKSDConfig) NewDiscovererMetrics(_ prometheus.Registerer, rmi discovery.RefreshMetricsInstantiator) discovery.DiscovererMetrics {
|
|
return &mskMetrics{
|
|
refreshMetrics: rmi,
|
|
}
|
|
}
|
|
|
|
// Name returns the name of the MSK Config.
|
|
func (*MSKSDConfig) Name() string { return "msk" }
|
|
|
|
// NewDiscoverer returns a Discoverer for the MSK Config.
|
|
func (c *MSKSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discovery.Discoverer, error) {
|
|
return NewMSKDiscovery(c, opts)
|
|
}
|
|
|
|
// UnmarshalYAML implements the yaml.Unmarshaler interface for the MSK Config.
|
|
func (c *MSKSDConfig) UnmarshalYAML(unmarshal func(any) error) error {
|
|
*c = DefaultMSKSDConfig
|
|
type plain MSKSDConfig
|
|
err := unmarshal((*plain)(c))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if c.Region == "" {
|
|
cfg, err := awsConfig.LoadDefaultConfig(context.Background())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if cfg.Region != "" {
|
|
// If the region is already set in the config, use it (env vars).
|
|
c.Region = cfg.Region
|
|
}
|
|
|
|
if c.Region == "" {
|
|
// Try to get the region from IMDS.
|
|
imdsClient := imds.NewFromConfig(cfg)
|
|
region, err := imdsClient.GetRegion(context.Background(), &imds.GetRegionInput{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.Region = region.Region
|
|
}
|
|
}
|
|
|
|
if c.Region == "" {
|
|
return errors.New("MSK SD configuration requires a region")
|
|
}
|
|
|
|
return c.HTTPClientConfig.Validate()
|
|
}
|
|
|
|
type mskClient interface {
|
|
DescribeClusterV2(context.Context, *kafka.DescribeClusterV2Input, ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error)
|
|
ListClustersV2(context.Context, *kafka.ListClustersV2Input, ...func(*kafka.Options)) (*kafka.ListClustersV2Output, error)
|
|
ListNodes(context.Context, *kafka.ListNodesInput, ...func(*kafka.Options)) (*kafka.ListNodesOutput, error)
|
|
}
|
|
|
|
// MSKDiscovery periodically performs MSK-SD requests. It implements
|
|
// the Discoverer interface.
|
|
type MSKDiscovery struct {
|
|
*refresh.Discovery
|
|
logger *slog.Logger
|
|
cfg *MSKSDConfig
|
|
msk mskClient
|
|
}
|
|
|
|
// NewMSKDiscovery returns a new MSKDiscovery which periodically refreshes its targets.
|
|
func NewMSKDiscovery(conf *MSKSDConfig, opts discovery.DiscovererOptions) (*MSKDiscovery, error) {
|
|
m, ok := opts.Metrics.(*mskMetrics)
|
|
if !ok {
|
|
return nil, errors.New("invalid discovery metrics type")
|
|
}
|
|
|
|
if opts.Logger == nil {
|
|
opts.Logger = promslog.NewNopLogger()
|
|
}
|
|
d := &MSKDiscovery{
|
|
logger: opts.Logger,
|
|
cfg: conf,
|
|
}
|
|
d.Discovery = refresh.NewDiscovery(
|
|
refresh.Options{
|
|
Logger: opts.Logger,
|
|
Mech: "msk",
|
|
Interval: time.Duration(d.cfg.RefreshInterval),
|
|
RefreshF: d.refresh,
|
|
MetricsInstantiator: m.refreshMetrics,
|
|
},
|
|
)
|
|
return d, nil
|
|
}
|
|
|
|
func (d *MSKDiscovery) initMskClient(ctx context.Context) error {
|
|
if d.msk != nil {
|
|
return nil
|
|
}
|
|
|
|
if d.cfg.Region == "" {
|
|
return errors.New("region must be set for MSK service discovery")
|
|
}
|
|
|
|
// Build the HTTP client from the provided HTTPClientConfig.
|
|
client, err := config.NewClientFromConfig(d.cfg.HTTPClientConfig, "msk_sd")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Build the AWS config with the provided region.
|
|
var configOptions []func(*awsConfig.LoadOptions) error
|
|
configOptions = append(configOptions, awsConfig.WithRegion(d.cfg.Region))
|
|
configOptions = append(configOptions, awsConfig.WithHTTPClient(client))
|
|
|
|
// Only set static credentials if both access key and secret key are provided
|
|
// Otherwise, let AWS SDK use its default credential chain
|
|
if d.cfg.AccessKey != "" && d.cfg.SecretKey != "" {
|
|
credProvider := credentials.NewStaticCredentialsProvider(d.cfg.AccessKey, string(d.cfg.SecretKey), "")
|
|
configOptions = append(configOptions, awsConfig.WithCredentialsProvider(credProvider))
|
|
}
|
|
|
|
if d.cfg.Profile != "" {
|
|
configOptions = append(configOptions, awsConfig.WithSharedConfigProfile(d.cfg.Profile))
|
|
}
|
|
|
|
cfg, err := awsConfig.LoadDefaultConfig(ctx, configOptions...)
|
|
if err != nil {
|
|
d.logger.Error("Failed to create AWS config", "error", err)
|
|
return fmt.Errorf("could not create aws config: %w", err)
|
|
}
|
|
|
|
// If the role ARN is set, assume the role to get credentials and set the credentials provider in the config.
|
|
if d.cfg.RoleARN != "" {
|
|
assumeProvider := stscreds.NewAssumeRoleProvider(sts.NewFromConfig(cfg), d.cfg.RoleARN)
|
|
cfg.Credentials = aws.NewCredentialsCache(assumeProvider)
|
|
}
|
|
|
|
d.msk = kafka.NewFromConfig(cfg, func(options *kafka.Options) {
|
|
if d.cfg.Endpoint != "" {
|
|
options.BaseEndpoint = &d.cfg.Endpoint
|
|
}
|
|
options.HTTPClient = client
|
|
})
|
|
|
|
// Test credentials by making a simple API call
|
|
testCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
defer cancel()
|
|
|
|
_, err = d.msk.ListClustersV2(testCtx, &kafka.ListClustersV2Input{})
|
|
if err != nil {
|
|
d.logger.Error("Failed to test MSK credentials", "error", err)
|
|
return fmt.Errorf("MSK credential test failed: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *MSKDiscovery) describeClusters(ctx context.Context, clusterARNs []string) ([]types.Cluster, error) {
|
|
var (
|
|
clusters []types.Cluster
|
|
wg sync.WaitGroup
|
|
mu sync.Mutex
|
|
errs []error
|
|
)
|
|
for _, clusterARN := range clusterARNs {
|
|
wg.Add(1)
|
|
go func(clusterARN string) {
|
|
defer wg.Done()
|
|
cluster, err := d.msk.DescribeClusterV2(ctx, &kafka.DescribeClusterV2Input{
|
|
ClusterArn: aws.String(clusterARN),
|
|
})
|
|
if err != nil {
|
|
mu.Lock()
|
|
errs = append(errs, fmt.Errorf("could not describe cluster %v: %w", clusterARN, err))
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
mu.Lock()
|
|
clusters = append(clusters, *cluster.ClusterInfo)
|
|
mu.Unlock()
|
|
}(clusterARN)
|
|
}
|
|
wg.Wait()
|
|
if len(errs) > 0 {
|
|
return nil, fmt.Errorf("errors occurred while describing clusters: %v", errs)
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func (d *MSKDiscovery) listClusters(ctx context.Context) ([]types.Cluster, error) {
|
|
var (
|
|
clusters []types.Cluster
|
|
nextToken *string
|
|
)
|
|
for {
|
|
listClustersInput := kafka.ListClustersV2Input{
|
|
ClusterTypeFilter: aws.String("PROVISIONED"),
|
|
MaxResults: aws.Int32(100),
|
|
NextToken: nextToken,
|
|
}
|
|
|
|
resp, err := d.msk.ListClustersV2(ctx, &listClustersInput)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not list clusters: %w", err)
|
|
}
|
|
|
|
clusters = append(clusters, resp.ClusterInfoList...)
|
|
if resp.NextToken == nil {
|
|
break
|
|
}
|
|
nextToken = resp.NextToken
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func (d *MSKDiscovery) listNodes(ctx context.Context, clusterARN string) ([]types.NodeInfo, error) {
|
|
var (
|
|
nodes []types.NodeInfo
|
|
nextToken *string
|
|
)
|
|
for {
|
|
resp, err := d.msk.ListNodes(ctx, &kafka.ListNodesInput{
|
|
ClusterArn: aws.String(clusterARN),
|
|
MaxResults: aws.Int32(100),
|
|
NextToken: nextToken,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not list nodes for cluster %v: %w", clusterARN, err)
|
|
}
|
|
|
|
nodes = append(nodes, resp.NodeInfoList...)
|
|
if resp.NextToken == nil {
|
|
break
|
|
}
|
|
nextToken = resp.NextToken
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|
|
func (d *MSKDiscovery) refresh(ctx context.Context) ([]*targetgroup.Group, error) {
|
|
err := d.initMskClient(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tg := &targetgroup.Group{
|
|
Source: d.cfg.Region,
|
|
}
|
|
|
|
var clusters []types.Cluster
|
|
if len(d.cfg.Clusters) > 0 {
|
|
clusters, err = d.describeClusters(ctx, d.cfg.Clusters)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
clusters, err = d.listClusters(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var (
|
|
targetsMu sync.Mutex
|
|
wg sync.WaitGroup
|
|
)
|
|
for _, cluster := range clusters {
|
|
wg.Add(1)
|
|
go func(cluster types.Cluster) {
|
|
defer wg.Done()
|
|
|
|
nodes, err := d.listNodes(ctx, aws.ToString(cluster.ClusterArn))
|
|
if err != nil {
|
|
d.logger.Error("Failed to list nodes", "cluster", aws.ToString(cluster.ClusterName), "error", err)
|
|
return
|
|
}
|
|
|
|
for _, node := range nodes {
|
|
labels := model.LabelSet{
|
|
mskLabelClusterName: model.LabelValue(aws.ToString(cluster.ClusterName)),
|
|
mskLabelClusterARN: model.LabelValue(aws.ToString(cluster.ClusterArn)),
|
|
mskLabelClusterState: model.LabelValue(string(cluster.State)),
|
|
mskLabelClusterType: model.LabelValue(string(cluster.ClusterType)),
|
|
mskLabelClusterVersion: model.LabelValue(aws.ToString(cluster.CurrentVersion)),
|
|
mskLabelNodeARN: model.LabelValue(aws.ToString(node.NodeARN)),
|
|
mskLabelNodeAddedTime: model.LabelValue(aws.ToString(node.AddedToClusterTime)),
|
|
mskLabelNodeInstanceType: model.LabelValue(aws.ToString(node.InstanceType)),
|
|
mskLabelClusterJmxExporterEnabled: model.LabelValue(strconv.FormatBool(*cluster.Provisioned.OpenMonitoring.Prometheus.JmxExporter.EnabledInBroker)),
|
|
mskLabelClusterConfigurationARN: model.LabelValue(aws.ToString(cluster.Provisioned.CurrentBrokerSoftwareInfo.ConfigurationArn)),
|
|
mskLabelClusterConfigurationRevision: model.LabelValue(strconv.FormatInt(*cluster.Provisioned.CurrentBrokerSoftwareInfo.ConfigurationRevision, 10)),
|
|
mskLabelClusterKafkaVersion: model.LabelValue(aws.ToString(cluster.Provisioned.CurrentBrokerSoftwareInfo.KafkaVersion)),
|
|
}
|
|
|
|
for key, value := range cluster.Tags {
|
|
labels[model.LabelName(mskLabelClusterTags+strutil.SanitizeLabelName(key))] = model.LabelValue(value)
|
|
}
|
|
|
|
switch nodeType(node) {
|
|
case NodeTypeBroker:
|
|
labels[mskLabelNodeType] = model.LabelValue(NodeTypeBroker)
|
|
labels[mskLabelNodeAttachedENI] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.AttachedENIId))
|
|
labels[mskLabelBrokerID] = model.LabelValue(fmt.Sprintf("%.0f", aws.ToFloat64(node.BrokerNodeInfo.BrokerId)))
|
|
labels[mskLabelBrokerClientSubnet] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.ClientSubnet))
|
|
labels[mskLabelBrokerClientVPCIP] = model.LabelValue(aws.ToString(node.BrokerNodeInfo.ClientVpcIpAddress))
|
|
labels[mskLabelBrokerNodeExporterEnabled] = model.LabelValue(strconv.FormatBool(*cluster.Provisioned.OpenMonitoring.Prometheus.NodeExporter.EnabledInBroker))
|
|
|
|
for idx, endpoint := range node.BrokerNodeInfo.Endpoints {
|
|
endpointLabels := labels.Clone()
|
|
endpointLabels[mskLabelBrokerEndpointIndex] = model.LabelValue(strconv.Itoa(idx))
|
|
endpointLabels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(endpoint, strconv.Itoa(d.cfg.Port)))
|
|
|
|
targetsMu.Lock()
|
|
tg.Targets = append(tg.Targets, endpointLabels)
|
|
targetsMu.Unlock()
|
|
}
|
|
|
|
case NodeTypeController:
|
|
labels[mskLabelNodeType] = model.LabelValue(NodeTypeController)
|
|
|
|
for idx, endpoint := range node.ControllerNodeInfo.Endpoints {
|
|
endpointLabels := labels.Clone()
|
|
endpointLabels[mskLabelControllerEndpointIndex] = model.LabelValue(strconv.Itoa(idx))
|
|
endpointLabels[model.AddressLabel] = model.LabelValue(net.JoinHostPort(endpoint, strconv.Itoa(d.cfg.Port)))
|
|
|
|
targetsMu.Lock()
|
|
tg.Targets = append(tg.Targets, endpointLabels)
|
|
targetsMu.Unlock()
|
|
}
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
}(cluster)
|
|
}
|
|
wg.Wait()
|
|
|
|
return []*targetgroup.Group{tg}, nil
|
|
}
|
|
|
|
func nodeType(node types.NodeInfo) NodeType {
|
|
if node.BrokerNodeInfo != nil {
|
|
return NodeTypeBroker
|
|
} else if node.ControllerNodeInfo != nil {
|
|
return NodeTypeController
|
|
}
|
|
return ""
|
|
}
|