mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-31 08:21:16 +01:00 
			
		
		
		
	Signed-off-by: Matthieu MOREL <mmorel-35@users.noreply.github.com> Co-authored-by: Matthieu MOREL <mmorel-35@users.noreply.github.com>
		
			
				
	
	
		
			229 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			229 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package main
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"math"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-kit/log"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/model/labels"
 | |
| 	"github.com/prometheus/prometheus/model/textparse"
 | |
| 	"github.com/prometheus/prometheus/tsdb"
 | |
| 	tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
 | |
| )
 | |
| 
 | |
| func getMinAndMaxTimestamps(p textparse.Parser) (int64, int64, error) {
 | |
| 	var maxt, mint int64 = math.MinInt64, math.MaxInt64
 | |
| 
 | |
| 	for {
 | |
| 		entry, err := p.Next()
 | |
| 		if errors.Is(err, io.EOF) {
 | |
| 			break
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return 0, 0, fmt.Errorf("next: %w", err)
 | |
| 		}
 | |
| 
 | |
| 		if entry != textparse.EntrySeries {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		_, ts, _ := p.Series()
 | |
| 		if ts == nil {
 | |
| 			return 0, 0, fmt.Errorf("expected timestamp for series got none")
 | |
| 		}
 | |
| 
 | |
| 		if *ts > maxt {
 | |
| 			maxt = *ts
 | |
| 		}
 | |
| 		if *ts < mint {
 | |
| 			mint = *ts
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if maxt == math.MinInt64 {
 | |
| 		maxt = 0
 | |
| 	}
 | |
| 	if mint == math.MaxInt64 {
 | |
| 		mint = 0
 | |
| 	}
 | |
| 
 | |
| 	return maxt, mint, nil
 | |
| }
 | |
| 
 | |
| func getCompatibleBlockDuration(maxBlockDuration int64) int64 {
 | |
| 	blockDuration := tsdb.DefaultBlockDuration
 | |
| 	if maxBlockDuration > tsdb.DefaultBlockDuration {
 | |
| 		ranges := tsdb.ExponentialBlockRanges(tsdb.DefaultBlockDuration, 10, 3)
 | |
| 		idx := len(ranges) - 1 // Use largest range if user asked for something enormous.
 | |
| 		for i, v := range ranges {
 | |
| 			if v > maxBlockDuration {
 | |
| 				idx = i - 1
 | |
| 				break
 | |
| 			}
 | |
| 		}
 | |
| 		blockDuration = ranges[idx]
 | |
| 	}
 | |
| 	return blockDuration
 | |
| }
 | |
| 
 | |
| func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool) (returnErr error) {
 | |
| 	blockDuration := getCompatibleBlockDuration(maxBlockDuration)
 | |
| 	mint = blockDuration * (mint / blockDuration)
 | |
| 
 | |
| 	db, err := tsdb.OpenDBReadOnly(outputDir, nil)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		returnErr = tsdb_errors.NewMulti(returnErr, db.Close()).Err()
 | |
| 	}()
 | |
| 
 | |
| 	var (
 | |
| 		wroteHeader  bool
 | |
| 		nextSampleTs int64 = math.MaxInt64
 | |
| 	)
 | |
| 
 | |
| 	for t := mint; t <= maxt; t = t + blockDuration {
 | |
| 		tsUpper := t + blockDuration
 | |
| 		if nextSampleTs != math.MaxInt64 && nextSampleTs >= tsUpper {
 | |
| 			// The next sample is not in this timerange, we can avoid parsing
 | |
| 			// the file for this timerange.
 | |
| 			continue
 | |
| 		}
 | |
| 		nextSampleTs = math.MaxInt64
 | |
| 
 | |
| 		err := func() error {
 | |
| 			// To prevent races with compaction, a block writer only allows appending samples
 | |
| 			// that are at most half a block size older than the most recent sample appended so far.
 | |
| 			// However, in the way we use the block writer here, compaction doesn't happen, while we
 | |
| 			// also need to append samples throughout the whole block range. To allow that, we
 | |
| 			// pretend that the block is twice as large here, but only really add sample in the
 | |
| 			// original interval later.
 | |
| 			w, err := tsdb.NewBlockWriter(log.NewNopLogger(), outputDir, 2*blockDuration)
 | |
| 			if err != nil {
 | |
| 				return fmt.Errorf("block writer: %w", err)
 | |
| 			}
 | |
| 			defer func() {
 | |
| 				err = tsdb_errors.NewMulti(err, w.Close()).Err()
 | |
| 			}()
 | |
| 
 | |
| 			ctx := context.Background()
 | |
| 			app := w.Appender(ctx)
 | |
| 			p := textparse.NewOpenMetricsParser(input)
 | |
| 			samplesCount := 0
 | |
| 			for {
 | |
| 				e, err := p.Next()
 | |
| 				if errors.Is(err, io.EOF) {
 | |
| 					break
 | |
| 				}
 | |
| 				if err != nil {
 | |
| 					return fmt.Errorf("parse: %w", err)
 | |
| 				}
 | |
| 				if e != textparse.EntrySeries {
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				_, ts, v := p.Series()
 | |
| 				if ts == nil {
 | |
| 					l := labels.Labels{}
 | |
| 					p.Metric(&l)
 | |
| 					return fmt.Errorf("expected timestamp for series %v, got none", l)
 | |
| 				}
 | |
| 				if *ts < t {
 | |
| 					continue
 | |
| 				}
 | |
| 				if *ts >= tsUpper {
 | |
| 					if *ts < nextSampleTs {
 | |
| 						nextSampleTs = *ts
 | |
| 					}
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				l := labels.Labels{}
 | |
| 				p.Metric(&l)
 | |
| 
 | |
| 				if _, err := app.Append(0, l, *ts, v); err != nil {
 | |
| 					return fmt.Errorf("add sample: %w", err)
 | |
| 				}
 | |
| 
 | |
| 				samplesCount++
 | |
| 				if samplesCount < maxSamplesInAppender {
 | |
| 					continue
 | |
| 				}
 | |
| 
 | |
| 				// If we arrive here, the samples count is greater than the maxSamplesInAppender.
 | |
| 				// Therefore the old appender is committed and a new one is created.
 | |
| 				// This prevents keeping too many samples lined up in an appender and thus in RAM.
 | |
| 				if err := app.Commit(); err != nil {
 | |
| 					return fmt.Errorf("commit: %w", err)
 | |
| 				}
 | |
| 
 | |
| 				app = w.Appender(ctx)
 | |
| 				samplesCount = 0
 | |
| 			}
 | |
| 
 | |
| 			if err := app.Commit(); err != nil {
 | |
| 				return fmt.Errorf("commit: %w", err)
 | |
| 			}
 | |
| 
 | |
| 			block, err := w.Flush(ctx)
 | |
| 			switch {
 | |
| 			case err == nil:
 | |
| 				if quiet {
 | |
| 					break
 | |
| 				}
 | |
| 				blocks, err := db.Blocks()
 | |
| 				if err != nil {
 | |
| 					return fmt.Errorf("get blocks: %w", err)
 | |
| 				}
 | |
| 				for _, b := range blocks {
 | |
| 					if b.Meta().ULID == block {
 | |
| 						printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable)
 | |
| 						wroteHeader = true
 | |
| 						break
 | |
| 					}
 | |
| 				}
 | |
| 			case errors.Is(err, tsdb.ErrNoSeriesAppended):
 | |
| 			default:
 | |
| 				return fmt.Errorf("flush: %w", err)
 | |
| 			}
 | |
| 
 | |
| 			return nil
 | |
| 		}()
 | |
| 		if err != nil {
 | |
| 			return fmt.Errorf("process blocks: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration) (err error) {
 | |
| 	p := textparse.NewOpenMetricsParser(input)
 | |
| 	maxt, mint, err := getMinAndMaxTimestamps(p)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("getting min and max timestamp: %w", err)
 | |
| 	}
 | |
| 	if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet); err != nil {
 | |
| 		return fmt.Errorf("block creation: %w", err)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 |