mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-31 00:11:23 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			231 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			231 lines
		
	
	
		
			6.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2017 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| //     http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| package api_v2
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"math"
 | |
| 	"math/rand"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"time"
 | |
| 
 | |
| 	old_ctx "golang.org/x/net/context"
 | |
| 
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/codes"
 | |
| 	"google.golang.org/grpc/status"
 | |
| 
 | |
| 	"github.com/cockroachdb/cockroach/pkg/util/protoutil"
 | |
| 	"github.com/grpc-ecosystem/grpc-gateway/runtime"
 | |
| 	"github.com/pkg/errors"
 | |
| 	"github.com/prometheus/tsdb"
 | |
| 	tsdbLabels "github.com/prometheus/tsdb/labels"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/pkg/labels"
 | |
| 	"github.com/prometheus/prometheus/pkg/timestamp"
 | |
| 	pb "github.com/prometheus/prometheus/prompb"
 | |
| )
 | |
| 
 | |
| // API encapsulates all API services.
 | |
| type API struct {
 | |
| 	enableAdmin bool
 | |
| 	db          func() *tsdb.DB
 | |
| }
 | |
| 
 | |
| // New returns a new API object.
 | |
| func New(
 | |
| 	db func() *tsdb.DB,
 | |
| 	enableAdmin bool,
 | |
| ) *API {
 | |
| 	return &API{
 | |
| 		db:          db,
 | |
| 		enableAdmin: enableAdmin,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RegisterGRPC registers all API services with the given server.
 | |
| func (api *API) RegisterGRPC(srv *grpc.Server) {
 | |
| 	if api.enableAdmin {
 | |
| 		pb.RegisterAdminServer(srv, NewAdmin(api.db))
 | |
| 	} else {
 | |
| 		pb.RegisterAdminServer(srv, &AdminDisabled{})
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // HTTPHandler returns an HTTP handler for a REST API gateway to the given grpc address.
 | |
| func (api *API) HTTPHandler(grpcAddr string) (http.Handler, error) {
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	enc := new(protoutil.JSONPb)
 | |
| 	mux := runtime.NewServeMux(runtime.WithMarshalerOption(enc.ContentType(), enc))
 | |
| 
 | |
| 	opts := []grpc.DialOption{grpc.WithInsecure()}
 | |
| 
 | |
| 	err := pb.RegisterAdminHandlerFromEndpoint(ctx, mux, grpcAddr, opts)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return mux, nil
 | |
| }
 | |
| 
 | |
| // extractTimeRange returns minimum and maximum timestamp in milliseconds as
 | |
| // provided by the time range. It defaults either boundary to the minimum and maximum
 | |
| // possible value.
 | |
| func extractTimeRange(min, max *time.Time) (mint, maxt time.Time, err error) {
 | |
| 	if min == nil {
 | |
| 		mint = minTime
 | |
| 	} else {
 | |
| 		mint = *min
 | |
| 	}
 | |
| 	if max == nil {
 | |
| 		maxt = maxTime
 | |
| 	} else {
 | |
| 		maxt = *max
 | |
| 	}
 | |
| 	if mint.After(maxt) {
 | |
| 		return mint, maxt, errors.Errorf("min time must be before max time")
 | |
| 	}
 | |
| 	return mint, maxt, nil
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	minTime = time.Unix(math.MinInt64/1000+62135596801, 0)
 | |
| 	maxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999)
 | |
| )
 | |
| 
 | |
| func labelsToProto(lset labels.Labels) pb.Labels {
 | |
| 	r := pb.Labels{
 | |
| 		Labels: make([]pb.Label, 0, len(lset)),
 | |
| 	}
 | |
| 	for _, l := range lset {
 | |
| 		r.Labels = append(r.Labels, pb.Label{Name: l.Name, Value: l.Value})
 | |
| 	}
 | |
| 	return r
 | |
| }
 | |
| 
 | |
| // AdminDisabled implements the administration interface that informs
 | |
| // that the API endpoints are disabled.
 | |
| type AdminDisabled struct {
 | |
| }
 | |
| 
 | |
| // TSDBSnapshot implements pb.AdminServer.
 | |
| func (s *AdminDisabled) TSDBSnapshot(_ old_ctx.Context, _ *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) {
 | |
| 	return nil, status.Error(codes.Unavailable, "Admin APIs are disabled")
 | |
| }
 | |
| 
 | |
| // TSDBCleanTombstones implements pb.AdminServer.
 | |
| func (s *AdminDisabled) TSDBCleanTombstones(_ old_ctx.Context, _ *pb.TSDBCleanTombstonesRequest) (*pb.TSDBCleanTombstonesResponse, error) {
 | |
| 	return nil, status.Error(codes.Unavailable, "Admin APIs are disabled")
 | |
| }
 | |
| 
 | |
| // DeleteSeries imeplements pb.AdminServer.
 | |
| func (s *AdminDisabled) DeleteSeries(_ old_ctx.Context, r *pb.SeriesDeleteRequest) (*pb.SeriesDeleteResponse, error) {
 | |
| 	return nil, status.Error(codes.Unavailable, "Admin APIs are disabled")
 | |
| }
 | |
| 
 | |
| // Admin provides an administration interface to Prometheus.
 | |
| type Admin struct {
 | |
| 	db func() *tsdb.DB
 | |
| }
 | |
| 
 | |
| // NewAdmin returns a Admin server.
 | |
| func NewAdmin(db func() *tsdb.DB) *Admin {
 | |
| 	return &Admin{
 | |
| 		db: db,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // TSDBSnapshot implements pb.AdminServer.
 | |
| func (s *Admin) TSDBSnapshot(_ old_ctx.Context, req *pb.TSDBSnapshotRequest) (*pb.TSDBSnapshotResponse, error) {
 | |
| 	db := s.db()
 | |
| 	if db == nil {
 | |
| 		return nil, status.Errorf(codes.Unavailable, "TSDB not ready")
 | |
| 	}
 | |
| 	var (
 | |
| 		snapdir = filepath.Join(db.Dir(), "snapshots")
 | |
| 		name    = fmt.Sprintf("%s-%x",
 | |
| 			time.Now().UTC().Format("20060102T150405Z0700"),
 | |
| 			rand.Int())
 | |
| 		dir = filepath.Join(snapdir, name)
 | |
| 	)
 | |
| 	if err := os.MkdirAll(dir, 0777); err != nil {
 | |
| 		return nil, status.Errorf(codes.Internal, "created snapshot directory: %s", err)
 | |
| 	}
 | |
| 	if err := db.Snapshot(dir, !req.SkipHead); err != nil {
 | |
| 		return nil, status.Errorf(codes.Internal, "create snapshot: %s", err)
 | |
| 	}
 | |
| 	return &pb.TSDBSnapshotResponse{Name: name}, nil
 | |
| }
 | |
| 
 | |
| // TSDBCleanTombstones implements pb.AdminServer.
 | |
| func (s *Admin) TSDBCleanTombstones(_ old_ctx.Context, _ *pb.TSDBCleanTombstonesRequest) (*pb.TSDBCleanTombstonesResponse, error) {
 | |
| 	db := s.db()
 | |
| 	if db == nil {
 | |
| 		return nil, status.Errorf(codes.Unavailable, "TSDB not ready")
 | |
| 	}
 | |
| 
 | |
| 	if err := db.CleanTombstones(); err != nil {
 | |
| 		return nil, status.Errorf(codes.Internal, "clean tombstones: %s", err)
 | |
| 	}
 | |
| 
 | |
| 	return &pb.TSDBCleanTombstonesResponse{}, nil
 | |
| }
 | |
| 
 | |
| // DeleteSeries implements pb.AdminServer.
 | |
| func (s *Admin) DeleteSeries(_ old_ctx.Context, r *pb.SeriesDeleteRequest) (*pb.SeriesDeleteResponse, error) {
 | |
| 	mint, maxt, err := extractTimeRange(r.MinTime, r.MaxTime)
 | |
| 	if err != nil {
 | |
| 		return nil, status.Error(codes.InvalidArgument, err.Error())
 | |
| 	}
 | |
| 	var matchers tsdbLabels.Selector
 | |
| 
 | |
| 	for _, m := range r.Matchers {
 | |
| 		var lm tsdbLabels.Matcher
 | |
| 		var err error
 | |
| 
 | |
| 		switch m.Type {
 | |
| 		case pb.LabelMatcher_EQ:
 | |
| 			lm = tsdbLabels.NewEqualMatcher(m.Name, m.Value)
 | |
| 		case pb.LabelMatcher_NEQ:
 | |
| 			lm = tsdbLabels.Not(tsdbLabels.NewEqualMatcher(m.Name, m.Value))
 | |
| 		case pb.LabelMatcher_RE:
 | |
| 			lm, err = tsdbLabels.NewRegexpMatcher(m.Name, m.Value)
 | |
| 			if err != nil {
 | |
| 				return nil, status.Errorf(codes.InvalidArgument, "bad regexp matcher: %s", err)
 | |
| 			}
 | |
| 		case pb.LabelMatcher_NRE:
 | |
| 			lm, err = tsdbLabels.NewRegexpMatcher(m.Name, m.Value)
 | |
| 			if err != nil {
 | |
| 				return nil, status.Errorf(codes.InvalidArgument, "bad regexp matcher: %s", err)
 | |
| 			}
 | |
| 			lm = tsdbLabels.Not(lm)
 | |
| 		default:
 | |
| 			return nil, status.Error(codes.InvalidArgument, "unknown matcher type")
 | |
| 		}
 | |
| 
 | |
| 		matchers = append(matchers, lm)
 | |
| 	}
 | |
| 	db := s.db()
 | |
| 	if db == nil {
 | |
| 		return nil, status.Errorf(codes.Unavailable, "TSDB not ready")
 | |
| 	}
 | |
| 	if err := db.Delete(timestamp.FromTime(mint), timestamp.FromTime(maxt), matchers...); err != nil {
 | |
| 		return nil, status.Error(codes.Internal, err.Error())
 | |
| 	}
 | |
| 	return &pb.SeriesDeleteResponse{}, nil
 | |
| }
 |