prometheus/retrieval/scrape.go
Brian Brazil b87d3ca9ea Create stale markers when a target is stopped.
When a target is no longer returned from SD stop()
is called. However it may be recreated before the
next scrape interval happens. So we wait to set stalemarkers
until the scrape of the new target would have happened
and been ingested, which is 2 scrape intervals.

If we're shutting down the context will be cancelled,
so return immediately rather than holding things up for potentially
minutes waiting to safely set stalemarkers no newer than now.
If the server starts immediately back up again all is well.
If not, we're missing some stale markers.
2017-05-16 18:33:51 +01:00

762 lines
19 KiB
Go

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package retrieval
import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
"math"
"net/http"
"sync"
"time"
"unsafe"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/prometheus/common/version"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil"
)
const (
scrapeHealthMetricName = "up"
scrapeDurationMetricName = "scrape_duration_seconds"
scrapeSamplesMetricName = "scrape_samples_scraped"
samplesPostRelabelMetricName = "scrape_samples_post_metric_relabeling"
)
var (
targetIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "prometheus_target_interval_length_seconds",
Help: "Actual intervals between scrapes.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
},
[]string{"interval"},
)
targetReloadIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "prometheus_target_reload_length_seconds",
Help: "Actual interval to reload the scrape pool with a given configuration.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
},
[]string{"interval"},
)
targetSyncIntervalLength = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: "prometheus_target_sync_length_seconds",
Help: "Actual interval to sync the scrape pool.",
Objectives: map[float64]float64{0.01: 0.001, 0.05: 0.005, 0.5: 0.05, 0.90: 0.01, 0.99: 0.001},
},
[]string{"scrape_job"},
)
targetScrapePoolSyncsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "prometheus_target_scrape_pool_sync_total",
Help: "Total number of syncs that were executed on a scrape pool.",
},
[]string{"scrape_job"},
)
targetScrapeSampleLimit = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "prometheus_target_scrapes_exceeded_sample_limit_total",
Help: "Total number of scrapes that hit the sample limit and were rejected.",
},
)
)
func init() {
prometheus.MustRegister(targetIntervalLength)
prometheus.MustRegister(targetReloadIntervalLength)
prometheus.MustRegister(targetSyncIntervalLength)
prometheus.MustRegister(targetScrapePoolSyncsCounter)
prometheus.MustRegister(targetScrapeSampleLimit)
}
// scrapePool manages scrapes for sets of targets.
type scrapePool struct {
appendable Appendable
ctx context.Context
mtx sync.RWMutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
// set of hashes.
targets map[uint64]*Target
loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender) loop
}
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool {
client, err := NewHTTPClient(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
return &scrapePool{
appendable: app,
config: cfg,
ctx: ctx,
client: client,
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newScrapeLoop,
}
}
// stop terminates all scrape loops and returns after they all terminated.
func (sp *scrapePool) stop() {
var wg sync.WaitGroup
sp.mtx.Lock()
defer sp.mtx.Unlock()
for fp, l := range sp.loops {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(l)
delete(sp.loops, fp)
delete(sp.targets, fp)
}
wg.Wait()
}
// reload the scrape pool with the given scrape configuration. The target state is preserved
// but all scrape loops are restarted with the new scrape configuration.
// This method returns after all scrape loops that were stopped have stopped scraping.
func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
start := time.Now()
sp.mtx.Lock()
defer sp.mtx.Unlock()
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
sp.config = cfg
sp.client = client
var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
)
for fp, oldLoop := range sp.loops {
var (
t = sp.targets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
)
)
wg.Add(1)
go func(oldLoop, newLoop loop) {
oldLoop.stop()
wg.Done()
go newLoop.run(interval, timeout, nil)
}(oldLoop, newLoop)
sp.loops[fp] = newLoop
}
wg.Wait()
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(start).Seconds(),
)
}
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set.
func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
start := time.Now()
var all []*Target
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
log.With("err", err).Error("creating targets failed")
continue
}
all = append(all, targets...)
}
sp.sync(all)
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}
// sync takes a list of potentially duplicated targets, deduplicates them, starts
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
var (
uniqueTargets = map[uint64]struct{}{}
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
)
for _, t := range targets {
t := t
hash := t.hash()
uniqueTargets[hash] = struct{}{}
if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(sp.ctx, s,
func() storage.Appender {
return sp.sampleAppender(t)
},
func() storage.Appender {
return sp.reportAppender(t)
},
)
sp.targets[hash] = t
sp.loops[hash] = l
go l.run(interval, timeout, nil)
}
}
var wg sync.WaitGroup
// Stop and remove old targets and scraper loops.
for hash := range sp.targets {
if _, ok := uniqueTargets[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])
delete(sp.loops, hash)
delete(sp.targets, hash)
}
}
// Wait for all potentially stopped scrapers to terminate.
// This covers the case of flapping targets. If the server is under high load, a new scraper
// may be active and tries to insert. The old scraper that didn't terminate yet could still
// be inserting a previous sample set.
wg.Wait()
}
// sampleAppender returns an appender for ingested samples from the target.
func (sp *scrapePool) sampleAppender(target *Target) storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
// The limit is applied after metrics are potentially dropped via relabeling.
if sp.config.SampleLimit > 0 {
app = &limitAppender{
Appender: app,
limit: int(sp.config.SampleLimit),
}
}
// The relabelAppender has to be inside the label-modifying appenders
// so the relabeling rules are applied to the correct label set.
if mrc := sp.config.MetricRelabelConfigs; len(mrc) > 0 {
app = relabelAppender{
Appender: app,
relabelings: mrc,
}
}
if sp.config.HonorLabels {
app = honorLabelsAppender{
Appender: app,
labels: target.Labels(),
}
} else {
app = ruleLabelsAppender{
Appender: app,
labels: target.Labels(),
}
}
return app
}
// reportAppender returns an appender for reporting samples for the target.
func (sp *scrapePool) reportAppender(target *Target) storage.Appender {
app, err := sp.appendable.Appender()
if err != nil {
panic(err)
}
return ruleLabelsAppender{
Appender: app,
labels: target.Labels(),
}
}
// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
scrape(ctx context.Context, w io.Writer) error
report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration) time.Duration
}
// targetScraper implements the scraper interface for a target.
type targetScraper struct {
*Target
client *http.Client
req *http.Request
timeout time.Duration
gzipr *gzip.Reader
buf *bufio.Reader
}
const acceptHeader = `application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.7,text/plain;version=0.0.4;q=0.3,*/*;q=0.1`
var userAgentHeader = fmt.Sprintf("Prometheus/%s", version.Version)
func (s *targetScraper) scrape(ctx context.Context, w io.Writer) error {
if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return err
}
// Disable accept header to always negotiate for text format.
// req.Header.Add("Accept", acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Set("User-Agent", userAgentHeader)
req.Header.Set("X-Prometheus-Scrape-Timeout-Seconds", fmt.Sprintf("%f", s.timeout.Seconds()))
s.req = req
}
resp, err := ctxhttp.Do(ctx, s.client, s.req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned HTTP status %s", resp.Status)
}
if resp.Header.Get("Content-Encoding") != "gzip" {
_, err = io.Copy(w, resp.Body)
return err
}
if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return err
}
} else {
s.buf.Reset(resp.Body)
s.gzipr.Reset(s.buf)
}
_, err = io.Copy(w, s.gzipr)
s.gzipr.Close()
return err
}
// A loop can run and be stopped again. It must not be reused after it was stopped.
type loop interface {
run(interval, timeout time.Duration, errc chan<- error)
stop()
}
type lsetCacheEntry struct {
lset labels.Labels
str string
}
type scrapeLoop struct {
scraper scraper
appender func() storage.Appender
reportAppender func() storage.Appender
// TODO: Keep only the values from the last scrape to avoid a memory leak.
refCache map[string]uint64 // Parsed string to ref.
lsetCache map[uint64]lsetCacheEntry // Ref to labelset and string
samplesInPreviousScrape map[string]labels.Labels
ctx context.Context
scrapeCtx context.Context
cancel func()
stopped chan struct{}
}
func newScrapeLoop(ctx context.Context, sc scraper, app, reportApp func() storage.Appender) loop {
sl := &scrapeLoop{
scraper: sc,
appender: app,
reportAppender: reportApp,
refCache: map[string]uint64{},
lsetCache: map[uint64]lsetCacheEntry{},
stopped: make(chan struct{}),
ctx: ctx,
}
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
return sl
}
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
select {
case <-time.After(sl.scraper.offset(interval)):
// Continue after a scraping offset.
case <-sl.scrapeCtx.Done():
close(sl.stopped)
return
}
var last time.Time
ticker := time.NewTicker(interval)
defer ticker.Stop()
buf := bytes.NewBuffer(make([]byte, 0, 16000))
mainLoop:
for {
buf.Reset()
select {
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.scrapeCtx.Done():
break mainLoop
default:
}
var (
total, added int
start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)
// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}
err := sl.scraper.scrape(scrapeCtx, buf)
cancel()
var b []byte
if err == nil {
b = buf.Bytes()
} else if errc != nil {
errc <- err
}
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
if total, added, err = sl.append(b, start); err != nil {
log.With("err", err).Error("append failed")
// The append failed, probably due to a parse error.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, err = sl.append([]byte{}, start); err != nil {
log.With("err", err).Error("failure append failed")
}
}
sl.report(start, time.Since(start), total, added, err)
last = start
select {
case <-sl.ctx.Done():
close(sl.stopped)
return
case <-sl.scrapeCtx.Done():
break mainLoop
case <-ticker.C:
}
}
close(sl.stopped)
// Scraping has stopped. We want to write stale markers but
// the target may be recreated, so we wait just over 2 scrape intervals
// before creating them.
// If the context is cancelled, we presume the server is shutting down
// and will restart where is was. We do not attempt to write stale markers
// in this case.
if last.IsZero() {
// There never was a scrape, so there will be no stale markers.
return
}
// Wait for when the next scrape would have been, record its timestamp.
var staleTime time.Time
select {
case <-sl.ctx.Done():
return
case <-ticker.C:
staleTime = time.Now()
}
// Wait for when the next scrape would have been, if the target was recreated
// samples should have been ingested by now.
select {
case <-sl.ctx.Done():
return
case <-ticker.C:
}
// Wait for an extra 10% of the interval, just to be safe.
select {
case <-sl.ctx.Done():
return
case <-time.After(interval / 10):
}
// Call sl.append again with an empty scrape to trigger stale markers.
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
if _, _, err := sl.append([]byte{}, staleTime); err != nil {
log.With("err", err).Error("stale append failed")
}
}
// Stop the scraping. May still write data and stale markers after it has
// returned. Cancel the context to stop all writes.
func (sl *scrapeLoop) stop() {
sl.cancel()
<-sl.stopped
}
type sample struct {
metric labels.Labels
t int64
v float64
}
type samples []sample
func (s samples) Len() int { return len(s) }
func (s samples) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s samples) Less(i, j int) bool {
d := labels.Compare(s[i].metric, s[j].metric)
if d < 0 {
return true
} else if d > 0 {
return false
}
return s[i].t < s[j].t
}
func (sl *scrapeLoop) append(b []byte, ts time.Time) (total, added int, err error) {
var (
app = sl.appender()
p = textparse.New(b)
defTime = timestamp.FromTime(ts)
samplesScraped = map[string]labels.Labels{}
)
loop:
for p.Next() {
total++
t := defTime
met, tp, v := p.At()
if tp != nil {
t = *tp
}
mets := yoloString(met)
ref, ok := sl.refCache[mets]
if ok {
switch err = app.AddFast(ref, t, v); err {
case nil:
samplesScraped[sl.lsetCache[ref].str] = sl.lsetCache[ref].lset
case storage.ErrNotFound:
ok = false
case errSeriesDropped:
continue
case storage.ErrOutOfOrderSample:
log.With("timeseries", string(met)).Warn("Out of order sample")
continue
case storage.ErrDuplicateSampleForTimestamp:
log.With("timeseries", string(met)).Warn("Duplicate sample for timestamp")
continue
default:
break loop
}
}
if !ok {
var lset labels.Labels
p.Metric(&lset)
ref, err = app.Add(lset, t, v)
// TODO(fabxc): also add a dropped-cache?
switch err {
case nil:
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample:
err = nil
log.With("timeseries", string(met)).Warn("Out of order sample")
continue
case storage.ErrDuplicateSampleForTimestamp:
err = nil
log.With("timeseries", string(met)).Warn("Duplicate sample for timestamp")
continue
default:
break loop
}
// Allocate a real string.
mets = string(met)
sl.refCache[mets] = ref
str := lset.String()
sl.lsetCache[ref] = lsetCacheEntry{lset: lset, str: str}
if tp == nil {
// Bypass staleness logic if there is an explicit timestamp.
samplesScraped[str] = lset
}
}
added++
}
if err == nil {
err = p.Err()
}
if err == nil {
for metric, lset := range sl.samplesInPreviousScrape {
if _, ok := samplesScraped[metric]; !ok {
// Sample no longer exposed, mark it stale.
_, err = app.Add(lset, defTime, math.Float64frombits(value.StaleNaN))
switch err {
case nil:
case errSeriesDropped:
err = nil
continue
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not log here, as this is expected if a target goes away and comes back
// again with a new scrape loop.
err = nil
continue
default:
break
}
}
}
}
if err != nil {
app.Rollback()
return total, 0, err
}
if err := app.Commit(); err != nil {
return total, 0, err
}
sl.samplesInPreviousScrape = samplesScraped
return total, added, nil
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, appended int, err error) error {
sl.scraper.report(start, duration, err)
ts := timestamp.FromTime(start)
var health float64
if err == nil {
health = 1
}
app := sl.reportAppender()
if err := sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
app.Rollback()
return err
}
if err := sl.addReportSample(app, scrapeDurationMetricName, ts, duration.Seconds()); err != nil {
app.Rollback()
return err
}
if err := sl.addReportSample(app, scrapeSamplesMetricName, ts, float64(scraped)); err != nil {
app.Rollback()
return err
}
if err := sl.addReportSample(app, samplesPostRelabelMetricName, ts, float64(appended)); err != nil {
app.Rollback()
return err
}
return app.Commit()
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v float64) error {
ref, ok := sl.refCache[s]
if ok {
if err := app.AddFast(ref, t, v); err == nil {
return nil
} else if err != storage.ErrNotFound {
return err
}
}
met := labels.Labels{
labels.Label{Name: labels.MetricName, Value: s},
}
ref, err := app.Add(met, t, v)
if err != nil {
return err
}
sl.refCache[s] = ref
return nil
}