Add honor_timestamps (#5304)

Fixes #5302

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2019-03-15 11:04:15 +01:00 committed by Brian Brazil
parent 8155cc4992
commit 4397916cb2
6 changed files with 223 additions and 66 deletions

View File

@ -82,9 +82,10 @@ var (
DefaultScrapeConfig = ScrapeConfig{ DefaultScrapeConfig = ScrapeConfig{
// ScrapeTimeout and ScrapeInterval default to the // ScrapeTimeout and ScrapeInterval default to the
// configured globals. // configured globals.
MetricsPath: "/metrics", MetricsPath: "/metrics",
Scheme: "http", Scheme: "http",
HonorLabels: false, HonorLabels: false,
HonorTimestamps: true,
} }
// DefaultAlertmanagerConfig is the default alertmanager configuration. // DefaultAlertmanagerConfig is the default alertmanager configuration.
@ -334,6 +335,8 @@ type ScrapeConfig struct {
JobName string `yaml:"job_name"` JobName string `yaml:"job_name"`
// Indicator whether the scraped metrics should remain unmodified. // Indicator whether the scraped metrics should remain unmodified.
HonorLabels bool `yaml:"honor_labels,omitempty"` HonorLabels bool `yaml:"honor_labels,omitempty"`
// Indicator whether the scraped timestamps should be respected.
HonorTimestamps bool `yaml:"honor_timestamps"`
// A set of query parameters with which the target is scraped. // A set of query parameters with which the target is scraped.
Params url.Values `yaml:"params,omitempty"` Params url.Values `yaml:"params,omitempty"`
// How frequently to scrape the targets of this scrape config. // How frequently to scrape the targets of this scrape config.

View File

@ -121,9 +121,10 @@ var expectedConf = &Config{
{ {
JobName: "prometheus", JobName: "prometheus",
HonorLabels: true, HonorLabels: true,
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -193,9 +194,10 @@ var expectedConf = &Config{
JobName: "service-x", JobName: "service-x",
ScrapeInterval: model.Duration(50 * time.Second), HonorTimestamps: true,
ScrapeTimeout: model.Duration(5 * time.Second), ScrapeInterval: model.Duration(50 * time.Second),
SampleLimit: 1000, ScrapeTimeout: model.Duration(5 * time.Second),
SampleLimit: 1000,
HTTPClientConfig: config_util.HTTPClientConfig{ HTTPClientConfig: config_util.HTTPClientConfig{
BasicAuth: &config_util.BasicAuth{ BasicAuth: &config_util.BasicAuth{
@ -282,8 +284,9 @@ var expectedConf = &Config{
{ {
JobName: "service-y", JobName: "service-y",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -324,8 +327,9 @@ var expectedConf = &Config{
{ {
JobName: "service-z", JobName: "service-z",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: model.Duration(10 * time.Second), ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: model.Duration(10 * time.Second),
MetricsPath: "/metrics", MetricsPath: "/metrics",
Scheme: "http", Scheme: "http",
@ -342,8 +346,9 @@ var expectedConf = &Config{
{ {
JobName: "service-kubernetes", JobName: "service-kubernetes",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -371,8 +376,9 @@ var expectedConf = &Config{
{ {
JobName: "service-kubernetes-namespaces", JobName: "service-kubernetes-namespaces",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -400,8 +406,9 @@ var expectedConf = &Config{
{ {
JobName: "service-marathon", JobName: "service-marathon",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -427,8 +434,9 @@ var expectedConf = &Config{
{ {
JobName: "service-ec2", JobName: "service-ec2",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -459,8 +467,9 @@ var expectedConf = &Config{
{ {
JobName: "service-azure", JobName: "service-azure",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -483,8 +492,9 @@ var expectedConf = &Config{
{ {
JobName: "service-nerve", JobName: "service-nerve",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -502,8 +512,9 @@ var expectedConf = &Config{
{ {
JobName: "0123service-xxx", JobName: "0123service-xxx",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -519,11 +530,33 @@ var expectedConf = &Config{
}, },
}, },
}, },
{
JobName: "badfederation",
HonorTimestamps: false,
ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: "/federate",
Scheme: DefaultScrapeConfig.Scheme,
ServiceDiscoveryConfig: sd_config.ServiceDiscoveryConfig{
StaticConfigs: []*targetgroup.Group{
{
Targets: []model.LabelSet{
{model.AddressLabel: "localhost:9090"},
},
Source: "0",
},
},
},
},
{ {
JobName: "測試", JobName: "測試",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -542,8 +575,9 @@ var expectedConf = &Config{
{ {
JobName: "service-triton", JobName: "service-triton",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,
@ -569,8 +603,9 @@ var expectedConf = &Config{
{ {
JobName: "service-openstack", JobName: "service-openstack",
ScrapeInterval: model.Duration(15 * time.Second), HonorTimestamps: true,
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout, ScrapeInterval: model.Duration(15 * time.Second),
ScrapeTimeout: DefaultGlobalConfig.ScrapeTimeout,
MetricsPath: DefaultScrapeConfig.MetricsPath, MetricsPath: DefaultScrapeConfig.MetricsPath,
Scheme: DefaultScrapeConfig.Scheme, Scheme: DefaultScrapeConfig.Scheme,

View File

@ -230,6 +230,13 @@ scrape_configs:
- targets: - targets:
- localhost:9090 - localhost:9090
- job_name: badfederation
honor_timestamps: false
metrics_path: /federate
static_configs:
- targets:
- localhost:9090
- job_name: 測試 - job_name: 測試
metrics_path: /metrics metrics_path: /metrics
static_configs: static_configs:

View File

@ -135,6 +135,16 @@ job_name: <job_name>
# when a time series does not have a given label yet and are ignored otherwise. # when a time series does not have a given label yet and are ignored otherwise.
[ honor_labels: <boolean> | default = false ] [ honor_labels: <boolean> | default = false ]
# honor_timestamps controls whether Prometheus respects the timestamps present
# in scraped data.
#
# If honor_timestamps is set to "true", the timestamps of the metrics exposed
# by the target will be used.
#
# If honor_timestamps is set to "false", the timestamps of the metrics exposed
# by the target will be ignored.
[ honor_timestamps: <boolean> | default = true ]
# Configures the protocol scheme used for requests. # Configures the protocol scheme used for requests.
[ scheme: <scheme> | default = http ] [ scheme: <scheme> | default = http ]

View File

@ -162,11 +162,12 @@ type scrapePool struct {
} }
type scrapeLoopOptions struct { type scrapeLoopOptions struct {
target *Target target *Target
scraper scraper scraper scraper
limit int limit int
honorLabels bool honorLabels bool
mrc []*relabel.Config honorTimestamps bool
mrc []*relabel.Config
} }
const maxAheadTime = 10 * time.Minute const maxAheadTime = 10 * time.Minute
@ -220,6 +221,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, jitterSeed uint64,
}, },
cache, cache,
jitterSeed, jitterSeed,
opts.honorTimestamps,
) )
} }
@ -284,12 +286,13 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.client = client sp.client = client
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval) interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout) timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit) limit = int(sp.config.SampleLimit)
honor = sp.config.HonorLabels honorLabels = sp.config.HonorLabels
mrc = sp.config.MetricRelabelConfigs honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
) )
for fp, oldLoop := range sp.loops { for fp, oldLoop := range sp.loops {
@ -297,11 +300,12 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
t = sp.activeTargets[fp] t = sp.activeTargets[fp]
s = &targetScraper{Target: t, client: sp.client, timeout: timeout} s = &targetScraper{Target: t, client: sp.client, timeout: timeout}
newLoop = sp.newLoop(scrapeLoopOptions{ newLoop = sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
limit: limit, limit: limit,
honorLabels: honor, honorLabels: honorLabels,
mrc: mrc, honorTimestamps: honorTimestamps,
mrc: mrc,
}) })
) )
wg.Add(1) wg.Add(1)
@ -362,12 +366,13 @@ func (sp *scrapePool) sync(targets []*Target) {
defer sp.mtx.Unlock() defer sp.mtx.Unlock()
var ( var (
uniqueTargets = map[uint64]struct{}{} uniqueTargets = map[uint64]struct{}{}
interval = time.Duration(sp.config.ScrapeInterval) interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout) timeout = time.Duration(sp.config.ScrapeTimeout)
limit = int(sp.config.SampleLimit) limit = int(sp.config.SampleLimit)
honor = sp.config.HonorLabels honorLabels = sp.config.HonorLabels
mrc = sp.config.MetricRelabelConfigs honorTimestamps = sp.config.HonorTimestamps
mrc = sp.config.MetricRelabelConfigs
) )
for _, t := range targets { for _, t := range targets {
@ -378,11 +383,12 @@ func (sp *scrapePool) sync(targets []*Target) {
if _, ok := sp.activeTargets[hash]; !ok { if _, ok := sp.activeTargets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client, timeout: timeout} s := &targetScraper{Target: t, client: sp.client, timeout: timeout}
l := sp.newLoop(scrapeLoopOptions{ l := sp.newLoop(scrapeLoopOptions{
target: t, target: t,
scraper: s, scraper: s,
limit: limit, limit: limit,
honorLabels: honor, honorLabels: honorLabels,
mrc: mrc, honorTimestamps: honorTimestamps,
mrc: mrc,
}) })
sp.activeTargets[hash] = t sp.activeTargets[hash] = t
@ -576,12 +582,13 @@ type cacheEntry struct {
} }
type scrapeLoop struct { type scrapeLoop struct {
scraper scraper scraper scraper
l log.Logger l log.Logger
cache *scrapeCache cache *scrapeCache
lastScrapeSize int lastScrapeSize int
buffers *pool.Pool buffers *pool.Pool
jitterSeed uint64 jitterSeed uint64
honorTimestamps bool
appender func() storage.Appender appender func() storage.Appender
sampleMutator labelsMutator sampleMutator labelsMutator
@ -801,6 +808,7 @@ func newScrapeLoop(ctx context.Context,
appender func() storage.Appender, appender func() storage.Appender,
cache *scrapeCache, cache *scrapeCache,
jitterSeed uint64, jitterSeed uint64,
honorTimestamps bool,
) *scrapeLoop { ) *scrapeLoop {
if l == nil { if l == nil {
l = log.NewNopLogger() l = log.NewNopLogger()
@ -822,6 +830,7 @@ func newScrapeLoop(ctx context.Context,
jitterSeed: jitterSeed, jitterSeed: jitterSeed,
l: l, l: l,
ctx: ctx, ctx: ctx,
honorTimestamps: honorTimestamps,
} }
sl.scrapeCtx, sl.cancel = context.WithCancel(ctx) sl.scrapeCtx, sl.cancel = context.WithCancel(ctx)
@ -1039,6 +1048,9 @@ loop:
t := defTime t := defTime
met, tp, v := p.Series() met, tp, v := p.Series()
if !sl.honorTimestamps {
tp = nil
}
if tp != nil { if tp != nil {
t = *tp t = *tp
} }

View File

@ -396,6 +396,7 @@ func TestScrapeLoopStopBeforeRun(t *testing.T) {
nopMutator, nopMutator,
nopMutator, nopMutator,
nil, nil, 0, nil, nil, 0,
true,
) )
// The scrape pool synchronizes on stopping scrape loops. However, new scrape // The scrape pool synchronizes on stopping scrape loops. However, new scrape
@ -460,6 +461,7 @@ func TestScrapeLoopStop(t *testing.T) {
app, app,
nil, nil,
0, 0,
true,
) )
// Terminate loop after 2 scrapes. // Terminate loop after 2 scrapes.
@ -526,6 +528,7 @@ func TestScrapeLoopRun(t *testing.T) {
app, app,
nil, nil,
0, 0,
true,
) )
// The loop must terminate during the initial offset if the context // The loop must terminate during the initial offset if the context
@ -572,6 +575,7 @@ func TestScrapeLoopRun(t *testing.T) {
app, app,
nil, nil,
0, 0,
true,
) )
go func() { go func() {
@ -621,6 +625,7 @@ func TestScrapeLoopMetadata(t *testing.T) {
func() storage.Appender { return nopAppender{} }, func() storage.Appender { return nopAppender{} },
cache, cache,
0, 0,
true,
) )
defer cancel() defer cancel()
@ -671,6 +676,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
app, app,
nil, nil,
0, 0,
true,
) )
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
numScrapes := 0 numScrapes := 0
@ -730,6 +736,7 @@ func TestScrapeLoopRunCreatesStaleMarkersOnParseFailure(t *testing.T) {
app, app,
nil, nil,
0, 0,
true,
) )
// Succeed once, several failures, then stop. // Succeed once, several failures, then stop.
@ -835,6 +842,7 @@ func TestScrapeLoopAppend(t *testing.T) {
func() storage.Appender { return app }, func() storage.Appender { return app },
nil, nil,
0, 0,
true,
) )
now := time.Now() now := time.Now()
@ -875,6 +883,7 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
func() storage.Appender { return app }, func() storage.Appender { return app },
nil, nil,
0, 0,
true,
) )
// Get the value of the Counter before performing the append. // Get the value of the Counter before performing the append.
@ -936,6 +945,7 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
func() storage.Appender { return capp }, func() storage.Appender { return capp },
nil, nil,
0, 0,
true,
) )
now := time.Now() now := time.Now()
@ -976,6 +986,7 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
func() storage.Appender { return app }, func() storage.Appender { return app },
nil, nil,
0, 0,
true,
) )
now := time.Now() now := time.Now()
@ -1022,6 +1033,7 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
func() storage.Appender { return app }, func() storage.Appender { return app },
nil, nil,
0, 0,
true,
) )
now := time.Now() now := time.Now()
@ -1062,6 +1074,7 @@ func TestScrapeLoopRunReportsTargetDownOnScrapeError(t *testing.T) {
app, app,
nil, nil,
0, 0,
true,
) )
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1092,6 +1105,7 @@ func TestScrapeLoopRunReportsTargetDownOnInvalidUTF8(t *testing.T) {
app, app,
nil, nil,
0, 0,
true,
) )
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error { scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
@ -1139,6 +1153,7 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
func() storage.Appender { return app }, func() storage.Appender { return app },
nil, nil,
0, 0,
true,
) )
now := time.Unix(1, 0) now := time.Unix(1, 0)
@ -1173,6 +1188,7 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
}, },
nil, nil,
0, 0,
true,
) )
now := time.Now().Add(20 * time.Minute) now := time.Now().Add(20 * time.Minute)
@ -1353,3 +1369,77 @@ func (ts *testScraper) scrape(ctx context.Context, w io.Writer) (string, error)
} }
return "", ts.scrapeErr return "", ts.scrapeErr
} }
func TestScrapeLoop_RespectTimestamps(t *testing.T) {
s := testutil.NewStorage(t)
defer s.Close()
app, err := s.Appender()
if err != nil {
t.Error(err)
}
capp := &collectResultAppender{next: app}
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return capp },
nil, 0,
true,
)
now := time.Now()
_, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
want := []sample{
{
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: 0,
v: 1,
},
}
if !reflect.DeepEqual(want, capp.result) {
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, capp.result)
}
}
func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
s := testutil.NewStorage(t)
defer s.Close()
app, err := s.Appender()
if err != nil {
t.Error(err)
}
capp := &collectResultAppender{next: app}
sl := newScrapeLoop(context.Background(),
nil, nil, nil,
nopMutator,
nopMutator,
func() storage.Appender { return capp },
nil, 0,
false,
)
now := time.Now()
_, _, err = sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
if err != nil {
t.Fatalf("Unexpected append error: %s", err)
}
want := []sample{
{
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(now),
v: 1,
},
}
if !reflect.DeepEqual(want, capp.result) {
t.Fatalf("Appended samples not as expected. Wanted: %+v Got: %+v", want, capp.result)
}
}