mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-11-04 10:21:02 +01:00 
			
		
		
		
	For: #14355 This commit updates Prometheus to adopt stdlib's log/slog package in favor of go-kit/log. As part of converting to use slog, several other related changes are required to get prometheus working, including: - removed unused logging util func `RateLimit()` - forward ported the util/logging/Deduper logging by implementing a small custom slog.Handler that does the deduping before chaining log calls to the underlying real slog.Logger - move some of the json file logging functionality to use prom/common package functionality - refactored some of the new json file logging for scraping - changes to promql.QueryLogger interface to swap out logging methods for relevant slog sugar wrappers - updated lots of tests that used/replicated custom logging functionality, attempting to keep the logical goal of the tests consistent after the transition - added a healthy amount of `if logger == nil { $makeLogger }` type conditional checks amongst various functions where none were provided -- old code that used the go-kit/log.Logger interface had several places where there were nil references when trying to use functions like `With()` to add keyvals on the new *slog.Logger type Signed-off-by: TJ Hoplock <t.hoplock@gmail.com>
		
			
				
	
	
		
			237 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			237 lines
		
	
	
		
			7.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright 2020 The Prometheus Authors
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
// http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package main
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"log/slog"
 | 
						|
	"time"
 | 
						|
 | 
						|
	v1 "github.com/prometheus/client_golang/api/prometheus/v1"
 | 
						|
	"github.com/prometheus/common/model"
 | 
						|
	"github.com/prometheus/common/promslog"
 | 
						|
 | 
						|
	"github.com/prometheus/prometheus/model/labels"
 | 
						|
	"github.com/prometheus/prometheus/model/timestamp"
 | 
						|
	"github.com/prometheus/prometheus/rules"
 | 
						|
	"github.com/prometheus/prometheus/storage"
 | 
						|
	"github.com/prometheus/prometheus/tsdb"
 | 
						|
	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
 | 
						|
)
 | 
						|
 | 
						|
const maxSamplesInMemory = 5000
 | 
						|
 | 
						|
type queryRangeAPI interface {
 | 
						|
	QueryRange(ctx context.Context, query string, r v1.Range, opts ...v1.Option) (model.Value, v1.Warnings, error)
 | 
						|
}
 | 
						|
 | 
						|
type ruleImporter struct {
 | 
						|
	logger *slog.Logger
 | 
						|
	config ruleImporterConfig
 | 
						|
 | 
						|
	apiClient queryRangeAPI
 | 
						|
 | 
						|
	groups      map[string]*rules.Group
 | 
						|
	ruleManager *rules.Manager
 | 
						|
}
 | 
						|
 | 
						|
type ruleImporterConfig struct {
 | 
						|
	outputDir        string
 | 
						|
	start            time.Time
 | 
						|
	end              time.Time
 | 
						|
	evalInterval     time.Duration
 | 
						|
	maxBlockDuration time.Duration
 | 
						|
}
 | 
						|
 | 
						|
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
 | 
						|
// written to disk in blocks.
 | 
						|
func newRuleImporter(logger *slog.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter {
 | 
						|
	logger.Info("new rule importer", "component", "backfiller", "start", config.start.Format(time.RFC822), "end", config.end.Format(time.RFC822))
 | 
						|
	return &ruleImporter{
 | 
						|
		logger:      logger,
 | 
						|
		config:      config,
 | 
						|
		apiClient:   apiClient,
 | 
						|
		ruleManager: rules.NewManager(&rules.ManagerOptions{}),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// loadGroups parses groups from a list of recording rule files.
 | 
						|
func (importer *ruleImporter) loadGroups(_ context.Context, filenames []string) (errs []error) {
 | 
						|
	groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", nil, filenames...)
 | 
						|
	if errs != nil {
 | 
						|
		return errs
 | 
						|
	}
 | 
						|
	importer.groups = groups
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks.
 | 
						|
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
 | 
						|
	for name, group := range importer.groups {
 | 
						|
		importer.logger.Info("processing group", "component", "backfiller", "name", name)
 | 
						|
 | 
						|
		for i, r := range group.Rules() {
 | 
						|
			importer.logger.Info("processing rule", "component", "backfiller", "id", i, "name", r.Name())
 | 
						|
			if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), importer.config.start, importer.config.end, int64(importer.config.maxBlockDuration/time.Millisecond), group); err != nil {
 | 
						|
				errs = append(errs, err)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return errs
 | 
						|
}
 | 
						|
 | 
						|
// importRule queries a prometheus API to evaluate rules at times in the past.
 | 
						|
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time,
 | 
						|
	maxBlockDuration int64, grp *rules.Group,
 | 
						|
) (err error) {
 | 
						|
	blockDuration := getCompatibleBlockDuration(maxBlockDuration)
 | 
						|
	startInMs := start.Unix() * int64(time.Second/time.Millisecond)
 | 
						|
	endInMs := end.Unix() * int64(time.Second/time.Millisecond)
 | 
						|
 | 
						|
	for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock += blockDuration {
 | 
						|
		endOfBlock := startOfBlock + blockDuration - 1
 | 
						|
 | 
						|
		currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix())
 | 
						|
		startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano())
 | 
						|
		for startWithAlignment.Unix() < currStart {
 | 
						|
			startWithAlignment = startWithAlignment.Add(grp.Interval())
 | 
						|
		}
 | 
						|
		end := time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC()
 | 
						|
		if end.Before(startWithAlignment) {
 | 
						|
			break
 | 
						|
		}
 | 
						|
		val, warnings, err := importer.apiClient.QueryRange(ctx,
 | 
						|
			ruleExpr,
 | 
						|
			v1.Range{
 | 
						|
				Start: startWithAlignment,
 | 
						|
				End:   end,
 | 
						|
				Step:  grp.Interval(),
 | 
						|
			},
 | 
						|
		)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("query range: %w", err)
 | 
						|
		}
 | 
						|
		if warnings != nil {
 | 
						|
			importer.logger.Warn("Range query returned warnings.", "warnings", warnings)
 | 
						|
		}
 | 
						|
 | 
						|
		// To prevent races with compaction, a block writer only allows appending samples
 | 
						|
		// that are at most half a block size older than the most recent sample appended so far.
 | 
						|
		// However, in the way we use the block writer here, compaction doesn't happen, while we
 | 
						|
		// also need to append samples throughout the whole block range. To allow that, we
 | 
						|
		// pretend that the block is twice as large here, but only really add sample in the
 | 
						|
		// original interval later.
 | 
						|
		w, err := tsdb.NewBlockWriter(promslog.NewNopLogger(), importer.config.outputDir, 2*blockDuration)
 | 
						|
		if err != nil {
 | 
						|
			return fmt.Errorf("new block writer: %w", err)
 | 
						|
		}
 | 
						|
		var closed bool
 | 
						|
		defer func() {
 | 
						|
			if !closed {
 | 
						|
				err = tsdb_errors.NewMulti(err, w.Close()).Err()
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		app := newMultipleAppender(ctx, w)
 | 
						|
		var matrix model.Matrix
 | 
						|
		switch val.Type() {
 | 
						|
		case model.ValMatrix:
 | 
						|
			matrix = val.(model.Matrix)
 | 
						|
 | 
						|
			for _, sample := range matrix {
 | 
						|
				lb := labels.NewBuilder(labels.Labels{})
 | 
						|
 | 
						|
				for name, value := range sample.Metric {
 | 
						|
					lb.Set(string(name), string(value))
 | 
						|
				}
 | 
						|
 | 
						|
				// Setting the rule labels after the output of the query,
 | 
						|
				// so they can override query output.
 | 
						|
				ruleLabels.Range(func(l labels.Label) {
 | 
						|
					lb.Set(l.Name, l.Value)
 | 
						|
				})
 | 
						|
 | 
						|
				lb.Set(labels.MetricName, ruleName)
 | 
						|
				lbls := lb.Labels()
 | 
						|
 | 
						|
				for _, value := range sample.Values {
 | 
						|
					if err := app.add(ctx, lbls, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil {
 | 
						|
						return fmt.Errorf("add: %w", err)
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
		default:
 | 
						|
			return fmt.Errorf("rule result is wrong type %s", val.Type().String())
 | 
						|
		}
 | 
						|
 | 
						|
		if err := app.flushAndCommit(ctx); err != nil {
 | 
						|
			return fmt.Errorf("flush and commit: %w", err)
 | 
						|
		}
 | 
						|
		err = tsdb_errors.NewMulti(err, w.Close()).Err()
 | 
						|
		closed = true
 | 
						|
	}
 | 
						|
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender {
 | 
						|
	return &multipleAppender{
 | 
						|
		maxSamplesInMemory: maxSamplesInMemory,
 | 
						|
		writer:             blockWriter,
 | 
						|
		appender:           blockWriter.Appender(ctx),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// multipleAppender keeps track of how many series have been added to the current appender.
 | 
						|
// If the max samples have been added, then all series are committed and a new appender is created.
 | 
						|
type multipleAppender struct {
 | 
						|
	maxSamplesInMemory int
 | 
						|
	currentSampleCount int
 | 
						|
	writer             *tsdb.BlockWriter
 | 
						|
	appender           storage.Appender
 | 
						|
}
 | 
						|
 | 
						|
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error {
 | 
						|
	if _, err := m.appender.Append(0, l, t, v); err != nil {
 | 
						|
		return fmt.Errorf("multiappender append: %w", err)
 | 
						|
	}
 | 
						|
	m.currentSampleCount++
 | 
						|
	if m.currentSampleCount >= m.maxSamplesInMemory {
 | 
						|
		return m.commit(ctx)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *multipleAppender) commit(ctx context.Context) error {
 | 
						|
	if m.currentSampleCount == 0 {
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	if err := m.appender.Commit(); err != nil {
 | 
						|
		return fmt.Errorf("multiappender commit: %w", err)
 | 
						|
	}
 | 
						|
	m.appender = m.writer.Appender(ctx)
 | 
						|
	m.currentSampleCount = 0
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
 | 
						|
	if err := m.commit(ctx); err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if _, err := m.writer.Flush(ctx); err != nil {
 | 
						|
		return fmt.Errorf("multiappender flush: %w", err)
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 |