mirror of
				https://github.com/traefik/traefik.git
				synced 2025-11-04 10:21:15 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			145 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			145 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package metrics
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"time"
 | 
						|
 | 
						|
	kitlog "github.com/go-kit/kit/log"
 | 
						|
	"github.com/go-kit/kit/metrics/influx"
 | 
						|
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
 | 
						|
	influxdb2api "github.com/influxdata/influxdb-client-go/v2/api"
 | 
						|
	"github.com/influxdata/influxdb-client-go/v2/api/write"
 | 
						|
	influxdb2log "github.com/influxdata/influxdb-client-go/v2/log"
 | 
						|
	influxdb "github.com/influxdata/influxdb1-client/v2"
 | 
						|
	"github.com/traefik/traefik/v2/pkg/log"
 | 
						|
	"github.com/traefik/traefik/v2/pkg/safe"
 | 
						|
	"github.com/traefik/traefik/v2/pkg/types"
 | 
						|
)
 | 
						|
 | 
						|
var (
 | 
						|
	influxDB2Ticker *time.Ticker
 | 
						|
	influxDB2Store  *influx.Influx
 | 
						|
	influxDB2Client influxdb2.Client
 | 
						|
)
 | 
						|
 | 
						|
// RegisterInfluxDB2 creates metrics exporter for InfluxDB2.
 | 
						|
func RegisterInfluxDB2(ctx context.Context, config *types.InfluxDB2) Registry {
 | 
						|
	if influxDB2Client == nil {
 | 
						|
		var err error
 | 
						|
		if influxDB2Client, err = newInfluxDB2Client(config); err != nil {
 | 
						|
			log.FromContext(ctx).Error(err)
 | 
						|
			return nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if influxDB2Store == nil {
 | 
						|
		influxDB2Store = influx.New(
 | 
						|
			config.AdditionalLabels,
 | 
						|
			influxdb.BatchPointsConfig{},
 | 
						|
			kitlog.LoggerFunc(func(kv ...interface{}) error {
 | 
						|
				log.FromContext(ctx).Error(kv)
 | 
						|
				return nil
 | 
						|
			}),
 | 
						|
		)
 | 
						|
 | 
						|
		influxDB2Ticker = time.NewTicker(time.Duration(config.PushInterval))
 | 
						|
 | 
						|
		safe.Go(func() {
 | 
						|
			wc := influxDB2Client.WriteAPIBlocking(config.Org, config.Bucket)
 | 
						|
			influxDB2Store.WriteLoop(ctx, influxDB2Ticker.C, influxDB2Writer{wc: wc})
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	registry := &standardRegistry{
 | 
						|
		configReloadsCounter:           influxDB2Store.NewCounter(influxDBConfigReloadsName),
 | 
						|
		configReloadsFailureCounter:    influxDB2Store.NewCounter(influxDBConfigReloadsFailureName),
 | 
						|
		lastConfigReloadSuccessGauge:   influxDB2Store.NewGauge(influxDBLastConfigReloadSuccessName),
 | 
						|
		lastConfigReloadFailureGauge:   influxDB2Store.NewGauge(influxDBLastConfigReloadFailureName),
 | 
						|
		tlsCertsNotAfterTimestampGauge: influxDB2Store.NewGauge(influxDBTLSCertsNotAfterTimestampName),
 | 
						|
	}
 | 
						|
 | 
						|
	if config.AddEntryPointsLabels {
 | 
						|
		registry.epEnabled = config.AddEntryPointsLabels
 | 
						|
		registry.entryPointReqsCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsName)
 | 
						|
		registry.entryPointReqsTLSCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsTLSName)
 | 
						|
		registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBEntryPointReqDurationName), time.Second)
 | 
						|
		registry.entryPointOpenConnsGauge = influxDB2Store.NewGauge(influxDBEntryPointOpenConnsName)
 | 
						|
	}
 | 
						|
 | 
						|
	if config.AddRoutersLabels {
 | 
						|
		registry.routerEnabled = config.AddRoutersLabels
 | 
						|
		registry.routerReqsCounter = influxDB2Store.NewCounter(influxDBRouterReqsName)
 | 
						|
		registry.routerReqsTLSCounter = influxDB2Store.NewCounter(influxDBRouterReqsTLSName)
 | 
						|
		registry.routerReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBRouterReqsDurationName), time.Second)
 | 
						|
		registry.routerOpenConnsGauge = influxDB2Store.NewGauge(influxDBORouterOpenConnsName)
 | 
						|
	}
 | 
						|
 | 
						|
	if config.AddServicesLabels {
 | 
						|
		registry.svcEnabled = config.AddServicesLabels
 | 
						|
		registry.serviceReqsCounter = influxDB2Store.NewCounter(influxDBServiceReqsName)
 | 
						|
		registry.serviceReqsTLSCounter = influxDB2Store.NewCounter(influxDBServiceReqsTLSName)
 | 
						|
		registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBServiceReqsDurationName), time.Second)
 | 
						|
		registry.serviceRetriesCounter = influxDB2Store.NewCounter(influxDBServiceRetriesTotalName)
 | 
						|
		registry.serviceOpenConnsGauge = influxDB2Store.NewGauge(influxDBServiceOpenConnsName)
 | 
						|
		registry.serviceServerUpGauge = influxDB2Store.NewGauge(influxDBServiceServerUpName)
 | 
						|
	}
 | 
						|
 | 
						|
	return registry
 | 
						|
}
 | 
						|
 | 
						|
// StopInfluxDB2 stops and resets InfluxDB2 client, ticker and store.
 | 
						|
func StopInfluxDB2() {
 | 
						|
	if influxDB2Client != nil {
 | 
						|
		influxDB2Client.Close()
 | 
						|
	}
 | 
						|
	influxDB2Client = nil
 | 
						|
 | 
						|
	if influxDB2Ticker != nil {
 | 
						|
		influxDB2Ticker.Stop()
 | 
						|
	}
 | 
						|
	influxDB2Ticker = nil
 | 
						|
 | 
						|
	influxDB2Store = nil
 | 
						|
}
 | 
						|
 | 
						|
// newInfluxDB2Client creates an influxdb2.Client.
 | 
						|
func newInfluxDB2Client(config *types.InfluxDB2) (influxdb2.Client, error) {
 | 
						|
	if config.Token == "" || config.Org == "" || config.Bucket == "" {
 | 
						|
		return nil, errors.New("token, org or bucket property is missing")
 | 
						|
	}
 | 
						|
 | 
						|
	// Disable InfluxDB2 logs.
 | 
						|
	// See https://github.com/influxdata/influxdb-client-go/blob/v2.7.0/options.go#L128
 | 
						|
	influxdb2log.Log = nil
 | 
						|
 | 
						|
	return influxdb2.NewClient(config.Address, config.Token), nil
 | 
						|
}
 | 
						|
 | 
						|
type influxDB2Writer struct {
 | 
						|
	wc influxdb2api.WriteAPIBlocking
 | 
						|
}
 | 
						|
 | 
						|
func (w influxDB2Writer) Write(bp influxdb.BatchPoints) error {
 | 
						|
	ctx := log.With(context.Background(), log.Str(log.MetricsProviderName, "influxdb2"))
 | 
						|
	logger := log.FromContext(ctx)
 | 
						|
 | 
						|
	wps := make([]*write.Point, 0, len(bp.Points()))
 | 
						|
	for _, p := range bp.Points() {
 | 
						|
		fields, err := p.Fields()
 | 
						|
		if err != nil {
 | 
						|
			logger.Errorf("Error while getting %s point fields: %s", p.Name(), err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		wps = append(wps, influxdb2.NewPoint(
 | 
						|
			p.Name(),
 | 
						|
			p.Tags(),
 | 
						|
			fields,
 | 
						|
			p.Time(),
 | 
						|
		))
 | 
						|
	}
 | 
						|
 | 
						|
	return w.wc.WritePoint(ctx, wps...)
 | 
						|
}
 |