mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-31 00:11:23 +01:00 
			
		
		
		
	Signed-off-by: Cosrider <cosrider7@gmail.com> Signed-off-by: Cosrider <cosrider7@gmail.com>
		
			
				
	
	
		
			201 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			201 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2019 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package promql
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 	"unicode/utf8"
 | |
| 
 | |
| 	"github.com/edsrzf/mmap-go"
 | |
| 	"github.com/go-kit/log"
 | |
| 	"github.com/go-kit/log/level"
 | |
| )
 | |
| 
 | |
| type ActiveQueryTracker struct {
 | |
| 	mmapedFile    []byte
 | |
| 	getNextIndex  chan int
 | |
| 	logger        log.Logger
 | |
| 	maxConcurrent int
 | |
| }
 | |
| 
 | |
| type Entry struct {
 | |
| 	Query     string `json:"query"`
 | |
| 	Timestamp int64  `json:"timestamp_sec"`
 | |
| }
 | |
| 
 | |
| const (
 | |
| 	entrySize int = 1000
 | |
| )
 | |
| 
 | |
| func parseBrokenJSON(brokenJSON []byte) (string, bool) {
 | |
| 	queries := strings.ReplaceAll(string(brokenJSON), "\x00", "")
 | |
| 	if len(queries) > 0 {
 | |
| 		queries = queries[:len(queries)-1] + "]"
 | |
| 	}
 | |
| 
 | |
| 	// Conditional because of implementation detail: len() = 1 implies file consisted of a single char: '['.
 | |
| 	if len(queries) <= 1 {
 | |
| 		return "[]", false
 | |
| 	}
 | |
| 
 | |
| 	return queries, true
 | |
| }
 | |
| 
 | |
| func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
 | |
| 	if _, err := os.Stat(filename); err == nil {
 | |
| 		fd, err := os.Open(filename)
 | |
| 		if err != nil {
 | |
| 			level.Error(logger).Log("msg", "Failed to open query log file", "err", err)
 | |
| 			return
 | |
| 		}
 | |
| 		defer fd.Close()
 | |
| 
 | |
| 		brokenJSON := make([]byte, filesize)
 | |
| 		_, err = fd.Read(brokenJSON)
 | |
| 		if err != nil {
 | |
| 			level.Error(logger).Log("msg", "Failed to read query log file", "err", err)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		queries, queriesExist := parseBrokenJSON(brokenJSON)
 | |
| 		if !queriesExist {
 | |
| 			return
 | |
| 		}
 | |
| 		level.Info(logger).Log("msg", "These queries didn't finish in prometheus' last run:", "queries", queries)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func getMMapedFile(filename string, filesize int, logger log.Logger) ([]byte, error) {
 | |
| 	file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o666)
 | |
| 	if err != nil {
 | |
| 		absPath, pathErr := filepath.Abs(filename)
 | |
| 		if pathErr != nil {
 | |
| 			absPath = filename
 | |
| 		}
 | |
| 		level.Error(logger).Log("msg", "Error opening query log file", "file", absPath, "err", err)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	err = file.Truncate(int64(filesize))
 | |
| 	if err != nil {
 | |
| 		level.Error(logger).Log("msg", "Error setting filesize.", "filesize", filesize, "err", err)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0)
 | |
| 	if err != nil {
 | |
| 		level.Error(logger).Log("msg", "Failed to mmap", "file", filename, "Attempted size", filesize, "err", err)
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return fileAsBytes, err
 | |
| }
 | |
| 
 | |
| func NewActiveQueryTracker(localStoragePath string, maxConcurrent int, logger log.Logger) *ActiveQueryTracker {
 | |
| 	err := os.MkdirAll(localStoragePath, 0o777)
 | |
| 	if err != nil {
 | |
| 		level.Error(logger).Log("msg", "Failed to create directory for logging active queries")
 | |
| 	}
 | |
| 
 | |
| 	filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxConcurrent*entrySize
 | |
| 	logUnfinishedQueries(filename, filesize, logger)
 | |
| 
 | |
| 	fileAsBytes, err := getMMapedFile(filename, filesize, logger)
 | |
| 	if err != nil {
 | |
| 		panic("Unable to create mmap-ed active query log")
 | |
| 	}
 | |
| 
 | |
| 	copy(fileAsBytes, "[")
 | |
| 	activeQueryTracker := ActiveQueryTracker{
 | |
| 		mmapedFile:    fileAsBytes,
 | |
| 		getNextIndex:  make(chan int, maxConcurrent),
 | |
| 		logger:        logger,
 | |
| 		maxConcurrent: maxConcurrent,
 | |
| 	}
 | |
| 
 | |
| 	activeQueryTracker.generateIndices(maxConcurrent)
 | |
| 
 | |
| 	return &activeQueryTracker
 | |
| }
 | |
| 
 | |
| func trimStringByBytes(str string, size int) string {
 | |
| 	bytesStr := []byte(str)
 | |
| 
 | |
| 	trimIndex := len(bytesStr)
 | |
| 	if size < len(bytesStr) {
 | |
| 		for !utf8.RuneStart(bytesStr[size]) {
 | |
| 			size--
 | |
| 		}
 | |
| 		trimIndex = size
 | |
| 	}
 | |
| 
 | |
| 	return string(bytesStr[:trimIndex])
 | |
| }
 | |
| 
 | |
| func _newJSONEntry(query string, timestamp int64, logger log.Logger) []byte {
 | |
| 	entry := Entry{query, timestamp}
 | |
| 	jsonEntry, err := json.Marshal(entry)
 | |
| 	if err != nil {
 | |
| 		level.Error(logger).Log("msg", "Cannot create json of query", "query", query)
 | |
| 		return []byte{}
 | |
| 	}
 | |
| 
 | |
| 	return jsonEntry
 | |
| }
 | |
| 
 | |
| func newJSONEntry(query string, logger log.Logger) []byte {
 | |
| 	timestamp := time.Now().Unix()
 | |
| 	minEntryJSON := _newJSONEntry("", timestamp, logger)
 | |
| 
 | |
| 	query = trimStringByBytes(query, entrySize-(len(minEntryJSON)+1))
 | |
| 	jsonEntry := _newJSONEntry(query, timestamp, logger)
 | |
| 
 | |
| 	return jsonEntry
 | |
| }
 | |
| 
 | |
| func (tracker ActiveQueryTracker) generateIndices(maxConcurrent int) {
 | |
| 	for i := 0; i < maxConcurrent; i++ {
 | |
| 		tracker.getNextIndex <- 1 + (i * entrySize)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (tracker ActiveQueryTracker) GetMaxConcurrent() int {
 | |
| 	return tracker.maxConcurrent
 | |
| }
 | |
| 
 | |
| func (tracker ActiveQueryTracker) Delete(insertIndex int) {
 | |
| 	copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize))
 | |
| 	tracker.getNextIndex <- insertIndex
 | |
| }
 | |
| 
 | |
| func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) {
 | |
| 	select {
 | |
| 	case i := <-tracker.getNextIndex:
 | |
| 		fileBytes := tracker.mmapedFile
 | |
| 		entry := newJSONEntry(query, tracker.logger)
 | |
| 		start, end := i, i+entrySize
 | |
| 
 | |
| 		copy(fileBytes[start:], entry)
 | |
| 		copy(fileBytes[end-1:], ",")
 | |
| 		return i, nil
 | |
| 	case <-ctx.Done():
 | |
| 		return 0, ctx.Err()
 | |
| 	}
 | |
| }
 |