Merge pull request #17629 from prometheus/bwplotka/a2-tsdb

refactor(appenderV2)[PART1]: add AppenderV2 interface; add TSDB AppenderV2 implementation
This commit is contained in:
Bartlomiej Plotka 2025-12-09 11:41:00 +00:00 committed by GitHub
commit be419d80dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 13464 additions and 179 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 The Prometheus Authors
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -13,7 +13,11 @@
package metadata
import "github.com/prometheus/common/model"
import (
"strings"
"github.com/prometheus/common/model"
)
// Metadata stores a series' metadata information.
type Metadata struct {
@ -21,3 +25,21 @@ type Metadata struct {
Unit string `json:"unit"`
Help string `json:"help"`
}
// IsEmpty returns true if metadata structure is empty, including unknown type case.
func (m Metadata) IsEmpty() bool {
return (m.Type == "" || m.Type == model.MetricTypeUnknown) && m.Unit == "" && m.Help == ""
}
// Equals returns true if m is semantically the same as other metadata.
func (m Metadata) Equals(other Metadata) bool {
if strings.Compare(m.Unit, other.Unit) != 0 || strings.Compare(m.Help, other.Help) != 0 {
return false
}
// Unknown means the same as empty string.
if m.Type == "" || m.Type == model.MetricTypeUnknown {
return other.Type == "" || other.Type == model.MetricTypeUnknown
}
return m.Type == other.Type
}

View File

@ -0,0 +1,116 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metadata
import (
"testing"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
func TestMetadata_IsEmpty(t *testing.T) {
for _, tt := range []struct {
name string
m Metadata
expected bool
}{
{
name: "empty struct", expected: true,
},
{
name: "unknown type with empty fields", expected: true,
m: Metadata{Type: model.MetricTypeUnknown},
},
{
name: "type", expected: false,
m: Metadata{Type: model.MetricTypeCounter},
},
{
name: "unit", expected: false,
m: Metadata{Unit: "seconds"},
},
{
name: "help", expected: false,
m: Metadata{Help: "help text"},
},
{
name: "unknown type with help", expected: false,
m: Metadata{Type: model.MetricTypeUnknown, Help: "help text"},
},
} {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, tt.m.IsEmpty())
})
}
}
func TestMetadata_Equals(t *testing.T) {
for _, tt := range []struct {
name string
m Metadata
other Metadata
expected bool
}{
{
name: "same empty", expected: true,
},
{
name: "same", expected: true,
m: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "doc"},
},
{
name: "same unknown type", expected: true,
m: Metadata{Type: model.MetricTypeUnknown, Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeUnknown, Unit: "s", Help: "doc"},
},
{
name: "same mixed unknown type", expected: true,
m: Metadata{Type: "", Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeUnknown, Unit: "s", Help: "doc"},
},
{
name: "different unit", expected: false,
m: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeCounter, Unit: "bytes", Help: "doc"},
},
{
name: "different help", expected: false,
m: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "other doc"},
},
{
name: "different type", expected: false,
m: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeGauge, Unit: "s", Help: "doc"},
},
{
name: "different type with unknown", expected: false,
m: Metadata{Type: model.MetricTypeUnknown, Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "doc"},
},
{
name: "different type with empty", expected: false,
m: Metadata{Type: "", Unit: "s", Help: "doc"},
other: Metadata{Type: model.MetricTypeCounter, Unit: "s", Help: "doc"},
},
} {
t.Run(tt.name, func(t *testing.T) {
if got := tt.m.Equals(tt.other); got != tt.expected {
t.Errorf("Metadata.Equals() = %v, expected %v", got, tt.expected)
}
})
}
}

View File

@ -49,6 +49,7 @@ var (
// NOTE(bwplotka): This can be both an instrumentation failure or commonly expected
// behaviour, and we currently don't have a way to determine this. As a result
// it's recommended to ignore this error for now.
// TODO(bwplotka): Remove with appender v1 flow; not used in v2.
ErrOutOfOrderST = errors.New("start timestamp out of order, ignoring")
ErrSTNewerThanSample = errors.New("ST is newer or the same as sample's timestamp, ignoring")
)
@ -58,11 +59,14 @@ var (
// their own reference types.
type SeriesRef uint64
// Appendable allows creating appenders.
// Appendable allows creating Appender.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
type Appendable interface {
// Appender returns a new appender for the storage. The implementation
// can choose whether or not to use the context, for deadlines or to check
// for errors.
// Appender returns a new appender for the storage.
//
// Implementations CAN choose whether to use the context e.g. for deadlines,
// but it's not mandatory.
Appender(ctx context.Context) Appender
}
@ -255,7 +259,13 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
return f(mint, maxt)
}
// AppendOptions provides options for implementations of the Appender interface.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
type AppendOptions struct {
// DiscardOutOfOrder tells implementation that this append should not be out
// of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample
// error.
DiscardOutOfOrder bool
}
@ -267,7 +277,11 @@ type AppendOptions struct {
// The order of samples appended via the Appender is preserved within each
// series. I.e. samples are not reordered per timestamp, or by float/histogram
// type.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
type Appender interface {
AppenderTransaction
// Append adds a sample pair for the given series.
// An optional series reference can be provided to accelerate calls.
// A series reference number is returned which can be used to add further
@ -278,16 +292,6 @@ type Appender interface {
// If the reference is 0 it must not be used for caching.
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
// Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
// must not be used anymore after Commit has been called.
Commit() error
// Rollback rolls back all modifications made in the appender so far.
// Appender has to be discarded after rollback.
Rollback() error
// SetOptions configures the appender with specific append options such as
// discarding out-of-order samples even if out-of-order is enabled in the TSDB.
SetOptions(opts *AppendOptions)
@ -301,8 +305,8 @@ type Appender interface {
// GetRef is an extra interface on Appenders used by downstream projects
// (e.g. Cortex) to avoid maintaining a parallel set of references.
type GetRef interface {
// Returns reference number that can be used to pass to Appender.Append(),
// and a set of labels that will not cause another copy when passed to Appender.Append().
// GetRef returns a reference number that can be used to pass to AppenderV2.Append(),
// and a set of labels that will not cause another copy when passed to AppenderV2.Append().
// 0 means the appender does not have a reference to this series.
// hash should be a hash of lset.
GetRef(lset labels.Labels, hash uint64) (SeriesRef, labels.Labels)
@ -310,6 +314,8 @@ type GetRef interface {
// ExemplarAppender provides an interface for adding samples to exemplar storage, which
// within Prometheus is in-memory only.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
type ExemplarAppender interface {
// AppendExemplar adds an exemplar for the given series labels.
// An optional reference number can be provided to accelerate calls.
@ -326,6 +332,8 @@ type ExemplarAppender interface {
}
// HistogramAppender provides an interface for appending histograms to the storage.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
type HistogramAppender interface {
// AppendHistogram adds a histogram for the given series labels. An
// optional reference number can be provided to accelerate calls. A
@ -356,6 +364,8 @@ type HistogramAppender interface {
}
// MetadataUpdater provides an interface for associating metadata to stored series.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
type MetadataUpdater interface {
// UpdateMetadata updates a metadata entry for the given series and labels.
// A series reference number is returned which can be used to modify the
@ -368,6 +378,8 @@ type MetadataUpdater interface {
}
// StartTimestampAppender provides an interface for appending ST to storage.
//
// WARNING: Work AppendableV2 is in progress. Appendable will be removed soon (ETA: Q2 2026).
type StartTimestampAppender interface {
// AppendSTZeroSample adds synthetic zero sample for the given st timestamp,
// which will be associated with given series, labels and the incoming
@ -390,10 +402,10 @@ type SeriesSet interface {
Next() bool
// At returns full series. Returned series should be iterable even after Next is called.
At() Series
// The error that iteration has failed with.
// Err returns the error that iteration has failed with.
// When an error occurs, set cannot continue to iterate.
Err() error
// A collection of warnings for the whole set.
// Warnings returns a collection of warnings for the whole set.
// Warnings could be return even iteration has not failed with error.
Warnings() annotations.Annotations
}

182
storage/interface_append.go Normal file
View File

@ -0,0 +1,182 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"errors"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
)
// AppendableV2 allows creating AppenderV2.
type AppendableV2 interface {
// AppenderV2 returns a new appender for the storage.
//
// Implementations CAN choose whether to use the context e.g. for deadlines,
// but it's not mandatory.
AppenderV2(ctx context.Context) AppenderV2
}
// AOptions is a shorthand for AppendV2Options.
// NOTE: AppendOption is used already.
type AOptions = AppendV2Options
// AppendV2Options provides optional, auxiliary data and configuration for AppenderV2.Append.
type AppendV2Options struct {
// MetricFamilyName (optional) provides metric family name for the appended sample's
// series. If the client of the AppenderV2 has this information
// (e.g. from scrape) it's recommended to pass it to the appender.
//
// Provided string bytes are unsafe to reuse, it only lives for the duration of the Append call.
//
// Some implementations use this to avoid slow and prone to error metric family detection for:
// * Metadata per metric family storages (e.g. Prometheus metadata WAL/API/RW1)
// * Strictly complex types storages (e.g. OpenTelemetry Collector).
//
// NOTE(krajorama): Example purpose is highlighted in OTLP ingestion: OTLP calculates the
// metric family name for all metrics and uses it for generating summary,
// histogram series by adding the magic suffixes. The metric family name is
// passed down to the appender in case the storage needs it for metadata updates.
// Known user of this is Mimir that implements /api/v1/metadata and uses
// Remote-Write 1.0 for this. Might be removed later if no longer
// needed by any downstream project.
// NOTE(bwplotka): Long term, once Prometheus uses complex types on storage level
// the MetricFamilyName can be removed as MetricFamilyName will equal to __name__ always.
MetricFamilyName string
// Metadata (optional) attached to the appended sample.
// Metadata strings are safe for reuse.
// IMPORTANT: Appender v1 was only providing update. This field MUST be
// set (if known) even if it didn't change since the last iteration.
// This moves the responsibility for metadata storage options to TSDB.
Metadata metadata.Metadata
// Exemplars (optional) attached to the appended sample.
// Exemplar slice MUST be sorted by Exemplar.TS.
// Exemplar slice is unsafe for reuse.
Exemplars []exemplar.Exemplar
// RejectOutOfOrder tells implementation that this append should not be out
// of order. An OOO append MUST be rejected with storage.ErrOutOfOrderSample
// error.
RejectOutOfOrder bool
}
// AppendPartialError represents an AppenderV2.Append error that tells
// callers sample was written but some auxiliary optional data (e.g. exemplars)
// was not (or partially written)
//
// It's up to the caller to decide if it's an ignorable error or not, plus
// it allows extra reporting (e.g. for Remote Write 2.0 X-Remote-Write-Written headers).
type AppendPartialError struct {
ExemplarErrors []error
}
// Error returns combined error string.
func (e *AppendPartialError) Error() string {
errs := errors.Join(e.ExemplarErrors...)
if errs == nil {
return ""
}
return errs.Error()
}
var _ error = &AppendPartialError{}
// AppenderV2 provides appends against a storage for all types of samples.
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
//
// Operations on the AppenderV2 interface are not goroutine-safe.
//
// The order of samples appended via the AppenderV2 is preserved within each
// series. I.e. samples are not reordered per timestamp, or by float/histogram
// type.
type AppenderV2 interface {
AppenderTransaction
// Append appends a sample and related exemplars, metadata, and start timestamp (st) to the storage.
//
// ref (optional) represents the stable ID for the given series identified by ls (excluding metadata).
// Callers MAY provide the ref to help implementation avoid ls -> ref computation, otherwise ref MUST be 0 (unknown).
//
// ls represents labels for the sample's series.
//
// st (optional) represents sample start timestamp. 0 means unknown. Implementations
// are responsible for any potential ST storage logic (e.g. ST zero injections).
//
// t represents sample timestamp.
//
// v, h, fh represents sample value for each sample type.
// Callers MUST only provide one of the sample types (either v, h or fh).
// Implementations can detect the type of the sample with the following switch:
//
// switch {
// case fh != nil: It's a float histogram append.
// case h != nil: It's a histogram append.
// default: It's a float append.
// }
// TODO(bwplotka): We plan to experiment on using generics for complex sampleType, but do it after we unify interface (derisk) and before we add native summaries.
//
// Implementations MUST attempt to append sample even if metadata, exemplar or (st) start timestamp appends fail.
// Implementations MAY return AppendPartialError as an error. Use errors.As to detect.
// For the successful Append, Implementations MUST return valid SeriesRef that represents ls.
// NOTE(bwplotka): Given OTLP and native histograms and the relaxation of the requirement for
// type and unit suffixes in metric names we start to hit cases of ls being not enough for id
// of the series (metadata matters). Current solution is to enable 'type-and-unit-label' features for those cases, but we may
// start to extend the id with metadata one day.
Append(ref SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts AppendV2Options) (SeriesRef, error)
}
// AppenderTransaction allows transactional appends.
type AppenderTransaction interface {
// Commit submits the collected samples and purges the batch. If Commit
// returns a non-nil error, it also rolls back all modifications made in
// the appender so far, as Rollback would do. In any case, an Appender
// must not be used anymore after Commit has been called.
Commit() error
// Rollback rolls back all modifications made in the appender so far.
// Appender has to be discarded after rollback.
Rollback() error
}
// LimitedAppenderV1 is an Appender that only supports appending float and histogram samples.
// This is to support migration to AppenderV2.
// TODO(bwplotka): Remove once migration to AppenderV2 is fully complete.
type LimitedAppenderV1 interface {
Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error)
AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error)
}
// AppenderV2AsLimitedV1 returns appender that exposes AppenderV2 as LimitedAppenderV1
// TODO(bwplotka): Remove once migration to AppenderV2 is fully complete.
func AppenderV2AsLimitedV1(app AppenderV2) LimitedAppenderV1 {
return &limitedAppenderV1{AppenderV2: app}
}
type limitedAppenderV1 struct {
AppenderV2
}
func (a *limitedAppenderV1) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
return a.AppenderV2.Append(ref, l, 0, t, v, nil, nil, AppendV2Options{})
}
func (a *limitedAppenderV1) AppendHistogram(ref SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (SeriesRef, error) {
return a.AppenderV2.Append(ref, l, 0, t, 0, h, fh, AppendV2Options{})
}

View File

@ -86,6 +86,12 @@ func (w *BlockWriter) Appender(ctx context.Context) storage.Appender {
return w.head.Appender(ctx)
}
// AppenderV2 returns a new appender on the database.
// AppenderV2 can't be called concurrently. However, the returned AppenderV2 can safely be used concurrently.
func (w *BlockWriter) AppenderV2(ctx context.Context) storage.AppenderV2 {
return w.head.AppenderV2(ctx)
}
// Flush implements the Writer interface. This is where actual block writing
// happens. After flush completes, no writes can be done.
func (w *BlockWriter) Flush(ctx context.Context) (ulid.ULID, error) {

View File

@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
)
@ -59,3 +60,37 @@ func TestBlockWriter(t *testing.T) {
require.NoError(t, w.Close())
}
func TestBlockWriter_AppenderV2(t *testing.T) {
ctx := context.Background()
outputDir := t.TempDir()
w, err := NewBlockWriter(promslog.NewNopLogger(), outputDir, DefaultBlockDuration)
require.NoError(t, err)
// Add some series.
app := w.AppenderV2(ctx)
ts1, v1 := int64(44), float64(7)
_, err = app.Append(0, labels.FromStrings("a", "b"), 0, ts1, v1, nil, nil, storage.AOptions{})
require.NoError(t, err)
ts2, v2 := int64(55), float64(12)
_, err = app.Append(0, labels.FromStrings("c", "d"), 0, ts2, v2, nil, nil, storage.AOptions{})
require.NoError(t, err)
require.NoError(t, app.Commit())
id, err := w.Flush(ctx)
require.NoError(t, err)
// Confirm the block has the correct data.
blockpath := filepath.Join(outputDir, id.String())
b, err := OpenBlock(nil, blockpath, nil, nil)
require.NoError(t, err)
defer func() { require.NoError(t, b.Close()) }()
q, err := NewBlockQuerier(b, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))
sample1 := []chunks.Sample{sample{t: ts1, f: v1}}
sample2 := []chunks.Sample{sample{t: ts2, f: v2}}
expectedSeries := map[string][]chunks.Sample{"{a=\"b\"}": sample1, "{c=\"d\"}": sample2}
require.Equal(t, expectedSeries, series)
require.NoError(t, w.Close())
}

View File

@ -220,6 +220,20 @@ type Options struct {
// UseUncachedIO allows bypassing the page cache when appropriate.
UseUncachedIO bool
// EnableSTAsZeroSample represents 'created-timestamp-zero-ingestion' feature flag.
// If true, ST, if non-zero and earlier than sample timestamp, will be stored
// as a zero sample before the actual sample.
//
// The zero sample is best-effort, only debug log on failure is emitted.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableSTAsZeroSample bool
// EnableMetadataWALRecords represents 'metadata-wal-records' feature flag.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableMetadataWALRecords bool
// BlockCompactionExcludeFunc is a function which returns true for blocks that should NOT be compacted.
// It's passed down to the TSDB compactor.
BlockCompactionExcludeFunc BlockExcludeFilterFunc
@ -973,6 +987,8 @@ func open(dir string, l *slog.Logger, r prometheus.Registerer, opts *Options, rn
headOpts.OutOfOrderTimeWindow.Store(opts.OutOfOrderTimeWindow)
headOpts.OutOfOrderCapMax.Store(opts.OutOfOrderCapMax)
headOpts.EnableSharding = opts.EnableSharding
headOpts.EnableSTAsZeroSample = opts.EnableSTAsZeroSample
headOpts.EnableMetadataWALRecords = opts.EnableMetadataWALRecords
if opts.WALReplayConcurrency > 0 {
headOpts.WALReplayConcurrency = opts.WALReplayConcurrency
}
@ -1136,11 +1152,16 @@ func (db *DB) run(ctx context.Context) {
}
}
// Appender opens a new appender against the database.
// Appender opens a new Appender against the database.
func (db *DB) Appender(ctx context.Context) storage.Appender {
return dbAppender{db: db, Appender: db.head.Appender(ctx)}
}
// AppenderV2 opens a new AppenderV2 against the database.
func (db *DB) AppenderV2(ctx context.Context) storage.AppenderV2 {
return dbAppenderV2{db: db, AppenderV2: db.head.AppenderV2(ctx)}
}
// ApplyConfig applies a new config to the DB.
// Behaviour of 'OutOfOrderTimeWindow' is as follows:
// OOO enabled = oooTimeWindow > 0. OOO disabled = oooTimeWindow is 0.
@ -1254,6 +1275,36 @@ func (a dbAppender) Commit() error {
return err
}
// dbAppenderV2 wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppenderV2 struct {
storage.AppenderV2
db *DB
}
var _ storage.GetRef = dbAppenderV2{}
func (a dbAppenderV2) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
if g, ok := a.AppenderV2.(storage.GetRef); ok {
return g.GetRef(lset, hash)
}
return 0, labels.EmptyLabels()
}
func (a dbAppenderV2) Commit() error {
err := a.AppenderV2.Commit()
// We could just run this check every few minutes practically. But for benchmarks
// and high frequency use cases this is the safer way.
if a.db.head.compactable() {
select {
case a.db.compactc <- struct{}{}:
default:
}
}
return err
}
// waitingForCompactionDelay returns true if the DB is waiting for the Head compaction delay.
// This doesn't guarantee that the Head is really compactable.
func (db *DB) waitingForCompactionDelay() bool {

7621
tsdb/db_append_v2_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -187,6 +187,20 @@ type HeadOptions struct {
// EnableSharding enables ShardedPostings() support in the Head.
EnableSharding bool
// EnableSTAsZeroSample represents 'created-timestamp-zero-ingestion' feature flag.
// If true, ST, if non-empty and earlier than sample timestamp, will be stored
// as a zero sample before the actual sample.
//
// The zero sample is best-effort, only debug log on failure is emitted.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableSTAsZeroSample bool
// EnableMetadataWALRecords represents 'metadata-wal-records' feature flag.
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
EnableMetadataWALRecords bool
}
const (

View File

@ -165,17 +165,19 @@ func (h *Head) appender() *headAppender {
minValidTime := h.appendableMinValidTime()
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.
return &headAppender{
head: h,
minValidTime: minValidTime,
mint: math.MaxInt64,
maxt: math.MinInt64,
headMaxt: h.MaxTime(),
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
seriesRefs: h.getRefSeriesBuffer(),
series: h.getSeriesBuffer(),
typesInBatch: h.getTypeMap(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
headAppenderBase: headAppenderBase{
head: h,
minValidTime: minValidTime,
mint: math.MaxInt64,
maxt: math.MinInt64,
headMaxt: h.MaxTime(),
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
seriesRefs: h.getRefSeriesBuffer(),
series: h.getSeriesBuffer(),
typesInBatch: h.getTypeMap(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
},
}
}
@ -382,7 +384,7 @@ func (b *appendBatch) close(h *Head) {
b.exemplars = nil
}
type headAppender struct {
type headAppenderBase struct {
head *Head
minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64
@ -397,7 +399,10 @@ type headAppender struct {
appendID, cleanupAppendIDsBelow uint64
closed bool
hints *storage.AppendOptions
}
type headAppender struct {
headAppenderBase
hints *storage.AppendOptions
}
func (a *headAppender) SetOptions(opts *storage.AppendOptions) {
@ -525,7 +530,7 @@ func (a *headAppender) AppendSTZeroSample(ref storage.SeriesRef, lset labels.Lab
return storage.SeriesRef(s.ref), nil
}
func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) {
func (a *headAppenderBase) getOrCreate(lset labels.Labels) (s *memSeries, created bool, err error) {
// Ensure no empty labels have gotten through.
lset = lset.WithoutEmpty()
if lset.IsEmpty() {
@ -550,7 +555,7 @@ func (a *headAppender) getOrCreate(lset labels.Labels) (s *memSeries, created bo
// getCurrentBatch returns the current batch if it fits the provided sampleType
// for the provided series. Otherwise, it adds a new batch and returns it.
func (a *headAppender) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch {
func (a *headAppenderBase) getCurrentBatch(st sampleType, s chunks.HeadSeriesRef) *appendBatch {
h := a.head
newBatch := func() *appendBatch {
@ -1043,7 +1048,7 @@ func (a *headAppender) UpdateMetadata(ref storage.SeriesRef, lset labels.Labels,
var _ storage.GetRef = &headAppender{}
func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
func (a *headAppenderBase) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
s := a.head.series.getByHash(hash, lset)
if s == nil {
return 0, labels.EmptyLabels()
@ -1053,7 +1058,7 @@ func (a *headAppender) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRe
}
// log writes all headAppender's data to the WAL.
func (a *headAppender) log() error {
func (a *headAppenderBase) log() error {
if a.head.wal == nil {
return nil
}
@ -1185,7 +1190,7 @@ type appenderCommitContext struct {
}
// commitExemplars adds all exemplars from the provided batch to the head's exemplar storage.
func (a *headAppender) commitExemplars(b *appendBatch) {
func (a *headAppenderBase) commitExemplars(b *appendBatch) {
// No errors logging to WAL, so pass the exemplars along to the in memory storage.
for _, e := range b.exemplars {
s := a.head.series.getByID(chunks.HeadSeriesRef(e.ref))
@ -1205,7 +1210,7 @@ func (a *headAppender) commitExemplars(b *appendBatch) {
}
}
func (acc *appenderCommitContext) collectOOORecords(a *headAppender) {
func (acc *appenderCommitContext) collectOOORecords(a *headAppenderBase) {
if a.head.wbl == nil {
// WBL is not enabled. So no need to collect.
acc.wblSamples = nil
@ -1310,7 +1315,7 @@ func handleAppendableError(err error, appended, oooRejected, oobRejected, tooOld
// operations on the series after appending the samples.
//
// There are also specific functions to commit histograms and float histograms.
func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext) {
func (a *headAppenderBase) commitFloats(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
@ -1466,7 +1471,7 @@ func (a *headAppender) commitFloats(b *appendBatch, acc *appenderCommitContext)
}
// For details on the commitHistograms function, see the commitFloats docs.
func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitContext) {
func (a *headAppenderBase) commitHistograms(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
@ -1575,7 +1580,7 @@ func (a *headAppender) commitHistograms(b *appendBatch, acc *appenderCommitConte
}
// For details on the commitFloatHistograms function, see the commitFloats docs.
func (a *headAppender) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) {
func (a *headAppenderBase) commitFloatHistograms(b *appendBatch, acc *appenderCommitContext) {
var ok, chunkCreated bool
var series *memSeries
@ -1697,7 +1702,7 @@ func commitMetadata(b *appendBatch) {
}
}
func (a *headAppender) unmarkCreatedSeriesAsPendingCommit() {
func (a *headAppenderBase) unmarkCreatedSeriesAsPendingCommit() {
for _, s := range a.series {
s.Lock()
s.pendingCommit = false
@ -1707,7 +1712,7 @@ func (a *headAppender) unmarkCreatedSeriesAsPendingCommit() {
// Commit writes to the WAL and adds the data to the Head.
// TODO(codesome): Refactor this method to reduce indentation and make it more readable.
func (a *headAppender) Commit() (err error) {
func (a *headAppenderBase) Commit() (err error) {
if a.closed {
return ErrAppenderClosed
}
@ -2238,7 +2243,7 @@ func handleChunkWriteError(err error) {
}
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL.
func (a *headAppender) Rollback() (err error) {
func (a *headAppenderBase) Rollback() (err error) {
if a.closed {
return ErrAppenderClosed
}

396
tsdb/head_append_v2.go Normal file
View File

@ -0,0 +1,396 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"errors"
"fmt"
"math"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/value"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
)
// initAppenderV2 is a helper to initialize the time bounds of the head
// upon the first sample it receives.
type initAppenderV2 struct {
app storage.AppenderV2
head *Head
}
var _ storage.GetRef = &initAppenderV2{}
func (a *initAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
if a.app == nil {
a.head.initTime(t)
a.app = a.head.appenderV2()
}
return a.app.Append(ref, ls, st, t, v, h, fh, opts)
}
func (a *initAppenderV2) GetRef(lset labels.Labels, hash uint64) (storage.SeriesRef, labels.Labels) {
if g, ok := a.app.(storage.GetRef); ok {
return g.GetRef(lset, hash)
}
return 0, labels.EmptyLabels()
}
func (a *initAppenderV2) Commit() error {
if a.app == nil {
a.head.metrics.activeAppenders.Dec()
return nil
}
return a.app.Commit()
}
func (a *initAppenderV2) Rollback() error {
if a.app == nil {
a.head.metrics.activeAppenders.Dec()
return nil
}
return a.app.Rollback()
}
// AppenderV2 returns a new AppenderV2 on the database.
func (h *Head) AppenderV2(context.Context) storage.AppenderV2 {
h.metrics.activeAppenders.Inc()
// The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base.
if !h.initialized() {
return &initAppenderV2{
head: h,
}
}
return h.appenderV2()
}
func (h *Head) appenderV2() *headAppenderV2 {
minValidTime := h.appendableMinValidTime()
appendID, cleanupAppendIDsBelow := h.iso.newAppendID(minValidTime) // Every appender gets an ID that is cleared upon commit/rollback.
return &headAppenderV2{
headAppenderBase: headAppenderBase{
head: h,
minValidTime: minValidTime,
mint: math.MaxInt64,
maxt: math.MinInt64,
headMaxt: h.MaxTime(),
oooTimeWindow: h.opts.OutOfOrderTimeWindow.Load(),
seriesRefs: h.getRefSeriesBuffer(),
series: h.getSeriesBuffer(),
typesInBatch: h.getTypeMap(),
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
},
}
}
type headAppenderV2 struct {
headAppenderBase
}
func (a *headAppenderV2) Append(ref storage.SeriesRef, ls labels.Labels, st, t int64, v float64, h *histogram.Histogram, fh *histogram.FloatHistogram, opts storage.AOptions) (storage.SeriesRef, error) {
var (
// Avoid shadowing err variables for reliability.
valErr, appErr, partialErr error
sampleMetricType = sampleMetricTypeFloat
isStale bool
)
// Fail fast on incorrect histograms.
switch {
case fh != nil:
sampleMetricType = sampleMetricTypeHistogram
valErr = fh.Validate()
case h != nil:
sampleMetricType = sampleMetricTypeHistogram
valErr = h.Validate()
}
if valErr != nil {
return 0, valErr
}
// Fail fast if OOO is disabled and the sample is out of bounds.
// Otherwise, a full check will be done later to decide if the sample is in-order or out-of-order.
if a.oooTimeWindow == 0 && t < a.minValidTime {
a.head.metrics.outOfBoundSamples.WithLabelValues(sampleMetricType).Inc()
return 0, storage.ErrOutOfBounds
}
s := a.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
var err error
s, _, err = a.getOrCreate(ls)
if err != nil {
return 0, err
}
}
// TODO(bwplotka): Handle ST natively (as per PROM-60).
if a.head.opts.EnableSTAsZeroSample && st != 0 {
a.bestEffortAppendSTZeroSample(s, st, t, h, fh)
}
switch {
case fh != nil:
isStale = value.IsStaleNaN(fh.Sum)
appErr = a.appendFloatHistogram(s, t, fh, opts.RejectOutOfOrder)
case h != nil:
isStale = value.IsStaleNaN(h.Sum)
appErr = a.appendHistogram(s, t, h, opts.RejectOutOfOrder)
default:
isStale = value.IsStaleNaN(v)
if isStale {
// If we have added a sample before with this same appender, we
// can check the previously used type and turn a stale float
// sample into a stale histogram sample or stale float histogram
// sample as appropriate. This prevents an unnecessary creation
// of a new batch. However, since other appenders might append
// to the same series concurrently, this is not perfect but just
// an optimization for the more likely case.
switch a.typesInBatch[s.ref] {
case stHistogram, stCustomBucketHistogram:
return a.Append(ref, ls, st, t, 0, &histogram.Histogram{Sum: v}, nil, storage.AOptions{
RejectOutOfOrder: opts.RejectOutOfOrder,
})
case stFloatHistogram, stCustomBucketFloatHistogram:
return a.Append(ref, ls, st, t, 0, nil, &histogram.FloatHistogram{Sum: v}, storage.AOptions{
RejectOutOfOrder: opts.RejectOutOfOrder,
})
}
// Note that a series reference not yet in the map will come out
// as stNone, but since we do not handle that case separately,
// we do not need to check for the difference between "unknown
// series" and "known series with stNone".
}
appErr = a.appendFloat(s, t, v, opts.RejectOutOfOrder)
}
// Handle append error, if any.
if appErr != nil {
switch {
case errors.Is(appErr, storage.ErrOutOfOrderSample):
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricType).Inc()
case errors.Is(appErr, storage.ErrTooOldSample):
a.head.metrics.tooOldSamples.WithLabelValues(sampleMetricType).Inc()
}
return 0, appErr
}
if t < a.mint {
a.mint = t
}
if t > a.maxt {
a.maxt = t
}
if isStale {
// For stale values we never attempt to process metadata/exemplars, claim the success.
return ref, nil
}
// Append exemplars if any and if storage was configured for it.
if len(opts.Exemplars) > 0 && a.head.opts.EnableExemplarStorage && a.head.opts.MaxExemplars.Load() > 0 {
// Currently only exemplars can return partial errors.
partialErr = a.appendExemplars(s, opts.Exemplars)
}
// TODO(bwplotka): Move/reuse metadata tests from scrape, once scrape adopts AppenderV2.
// Currently tsdb package does not test metadata.
if a.head.opts.EnableMetadataWALRecords && !opts.Metadata.IsEmpty() {
s.Lock()
metaChanged := s.meta == nil || !s.meta.Equals(opts.Metadata)
s.Unlock()
if metaChanged {
b := a.getCurrentBatch(stNone, s.ref)
b.metadata = append(b.metadata, record.RefMetadata{
Ref: s.ref,
Type: record.GetMetricType(opts.Metadata.Type),
Unit: opts.Metadata.Unit,
Help: opts.Metadata.Help,
})
b.metadataSeries = append(b.metadataSeries, s)
}
}
return storage.SeriesRef(s.ref), partialErr
}
func (a *headAppenderV2) appendFloat(s *memSeries, t int64, v float64, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if isOOO && fastRejectOOO {
s.Unlock()
return storage.ErrOutOfOrderSample
}
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
return err
}
b := a.getCurrentBatch(stFloat, s.ref)
b.floats = append(b.floats, record.RefSample{Ref: s.ref, T: t, V: v})
b.floatSeries = append(b.floatSeries, s)
return nil
}
func (a *headAppenderV2) appendHistogram(s *memSeries, t int64, h *histogram.Histogram, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
isOOO, delta, err := s.appendableHistogram(t, h, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if isOOO && fastRejectOOO {
s.Unlock()
return storage.ErrOutOfOrderSample
}
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
return err
}
st := stHistogram
if h.UsesCustomBuckets() {
st = stCustomBucketHistogram
}
b := a.getCurrentBatch(st, s.ref)
b.histograms = append(b.histograms, record.RefHistogramSample{Ref: s.ref, T: t, H: h})
b.histogramSeries = append(b.histogramSeries, s)
return nil
}
func (a *headAppenderV2) appendFloatHistogram(s *memSeries, t int64, fh *histogram.FloatHistogram, fastRejectOOO bool) error {
s.Lock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
isOOO, delta, err := s.appendableFloatHistogram(t, fh, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if isOOO && fastRejectOOO {
s.Unlock()
return storage.ErrOutOfOrderSample
}
if err == nil {
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}
if err != nil {
return err
}
st := stFloatHistogram
if fh.UsesCustomBuckets() {
st = stCustomBucketFloatHistogram
}
b := a.getCurrentBatch(st, s.ref)
b.floatHistograms = append(b.floatHistograms, record.RefFloatHistogramSample{Ref: s.ref, T: t, FH: fh})
b.floatHistogramSeries = append(b.floatHistogramSeries, s)
return nil
}
func (a *headAppenderV2) appendExemplars(s *memSeries, exemplar []exemplar.Exemplar) error {
var errs []error
for _, e := range exemplar {
// Ensure no empty labels have gotten through.
e.Labels = e.Labels.WithoutEmpty()
if err := a.head.exemplars.ValidateExemplar(s.labels(), e); err != nil {
if !errors.Is(err, storage.ErrDuplicateExemplar) && !errors.Is(err, storage.ErrExemplarsDisabled) {
// Except duplicates, return partial errors.
errs = append(errs, err)
}
if !errors.Is(err, storage.ErrOutOfOrderExemplar) {
a.head.logger.Debug("Error while adding an exemplar on AppendSample", "exemplars", fmt.Sprintf("%+v", e), "err", e)
}
continue
}
b := a.getCurrentBatch(stNone, s.ref)
b.exemplars = append(b.exemplars, exemplarWithSeriesRef{storage.SeriesRef(s.ref), e})
}
if len(errs) > 0 {
return &storage.AppendPartialError{ExemplarErrors: errs}
}
return nil
}
// NOTE(bwplotka): This feature might be deprecated and removed once PROM-60
// is implemented.
//
// ST is an experimental feature, we don't fail the append on errors, just debug log.
func (a *headAppenderV2) bestEffortAppendSTZeroSample(s *memSeries, st, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) {
if st >= t {
a.head.logger.Debug("Error when appending ST", "series", s.lset.String(), "st", st, "t", t, "err", storage.ErrSTNewerThanSample)
return
}
if st < a.minValidTime {
a.head.logger.Debug("Error when appending ST", "series", s.lset.String(), "st", st, "t", t, "err", storage.ErrOutOfBounds)
return
}
var err error
switch {
case fh != nil:
zeroFloatHistogram := &histogram.FloatHistogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: fh.Schema,
ZeroThreshold: fh.ZeroThreshold,
CustomValues: fh.CustomValues,
}
err = a.appendFloatHistogram(s, st, zeroFloatHistogram, true)
case h != nil:
zeroHistogram := &histogram.Histogram{
// The STZeroSample represents a counter reset by definition.
CounterResetHint: histogram.CounterReset,
// Replicate other fields to avoid needless chunk creation.
Schema: h.Schema,
ZeroThreshold: h.ZeroThreshold,
CustomValues: h.CustomValues,
}
err = a.appendHistogram(s, st, zeroHistogram, true)
default:
err = a.appendFloat(s, st, 0, true)
}
if err != nil {
if errors.Is(err, storage.ErrOutOfOrderSample) {
// OOO errors are common and expected (cumulative). Explicitly ignored.
return
}
a.head.logger.Debug("Error when appending ST", "series", s.lset.String(), "st", st, "t", t, "err", err)
return
}
if st > a.maxt {
a.maxt = st
}
}
var _ storage.GetRef = &headAppenderV2{}

4724
tsdb/head_append_v2_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -14,7 +14,6 @@
package tsdb
import (
"context"
"errors"
"fmt"
"math/rand"
@ -32,6 +31,228 @@ import (
"github.com/prometheus/prometheus/util/compression"
)
type benchAppendFunc func(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction
func appendV1Float(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.Appender(b.Context())
for _, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
require.NoError(b, err)
}
}
return app
}
func appendV2Float(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.AppenderV2(b.Context())
for _, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), 0, ts+sampleIndex, float64(ts+sampleIndex), nil, nil, storage.AOptions{})
require.NoError(b, err)
}
}
return app
}
func appendV1FloatOrHistogramWithExemplars(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var err error
app := h.Appender(b.Context())
for i, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
require.NoError(b, err)
// Every sample also has an exemplar attached.
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
continue
}
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil)
require.NoError(b, err)
// Every histogram sample also has 3 exemplars attached.
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
require.NoError(b, err)
}
}
return app
}
func appendV2FloatOrHistogramWithExemplars(b *testing.B, h *Head, ts int64, series []storage.Series, samplesPerAppend int64) storage.AppenderTransaction {
var (
err error
ex = make([]exemplar.Exemplar, 3)
)
app := h.AppenderV2(b.Context())
for i, s := range series {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
aOpts := storage.AOptions{Exemplars: ex[:0]}
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
// Every sample also has an exemplar attached.
aOpts.Exemplars = append(aOpts.Exemplars, exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
})
ref, err = app.Append(ref, s.Labels(), 0, ts, float64(ts), nil, nil, aOpts)
require.NoError(b, err)
continue
}
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
// Every histogram sample also has 3 exemplars attached.
aOpts.Exemplars = append(aOpts.Exemplars,
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts + sampleIndex,
},
)
ref, err = app.Append(ref, s.Labels(), 0, ts, 0, h, nil, aOpts)
require.NoError(b, err)
}
}
return app
}
type appendCase struct {
name string
appendFunc benchAppendFunc
}
func appendCases() []appendCase {
return []appendCase{
{
name: "appender=v1/case=floats",
appendFunc: appendV1Float,
},
{
name: "appender=v2/case=floats",
appendFunc: appendV2Float,
},
{
name: "appender=v1/case=floatsHistogramsExemplars",
appendFunc: appendV1FloatOrHistogramWithExemplars,
},
{
name: "appender=v2/case=floatsHistogramsExemplars",
appendFunc: appendV2FloatOrHistogramWithExemplars,
},
}
}
/*
export bench=append && go test \
-run '^$' -bench '^BenchmarkHeadAppender_AppendCommit$' \
-benchtime 5s -count 6 -cpu 2 -timeout 999m \
| tee ${bench}.txt
*/
func BenchmarkHeadAppender_AppendCommit(b *testing.B) {
// NOTE(bwplotka): Previously we also had 1k and 10k series case. There is nothing
// special happening in 100 vs 1k vs 10k, so let's save considerable amount of benchmark time
// for quicker feedback. In return, we add more sample type cases.
// Similarly, we removed the 2 sample in append case.
//
// TODO(bwplotka): This still takes ~6500s (~2h) for -benchtime 5s -count 6 to complete.
// We might want to reduce the time bit more. 5s is really important as the slowest
// case (appender=v1/case=floatsHistogramsExemplars/series=100/samples_per_append=100-2)
// in 5s yields only 255 iters 23184892 ns/op. Perhaps -benchtime=300x would be better?
seriesCounts := []int{10, 100}
series := genSeries(100, 10, 0, 0) // Only using the generated labels.
for _, appendCase := range appendCases() {
for _, seriesCount := range seriesCounts {
for _, samplesPerAppend := range []int64{1, 5, 100} {
b.Run(fmt.Sprintf("%s/series=%d/samples_per_append=%d", appendCase.name, seriesCount, samplesPerAppend), func(b *testing.B) {
opts := newTestHeadDefaultOptions(10000, false)
opts.EnableExemplarStorage = true // We benchmark with exemplars, benchmark with them.
h, _ := newTestHeadWithOptions(b, compression.None, opts)
b.Cleanup(func() { require.NoError(b, h.Close()) })
ts := int64(1000)
// Init series, that's not what we're benchmarking here.
app := appendCase.appendFunc(b, h, ts, series[:seriesCount], samplesPerAppend)
require.NoError(b, app.Commit())
ts += 1000 // should increment more than highest samplesPerAppend
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
app := appendCase.appendFunc(b, h, ts, series[:seriesCount], samplesPerAppend)
require.NoError(b, app.Commit())
ts += 1000 // should increment more than highest samplesPerAppend
}
})
}
}
}
}
func BenchmarkHeadStripeSeriesCreate(b *testing.B) {
chunkDir := b.TempDir()
// Put a series, select it. GC it and then access it.
@ -86,86 +307,6 @@ func BenchmarkHeadStripeSeriesCreate_PreCreationFailure(b *testing.B) {
}
}
func BenchmarkHead_WalCommit(b *testing.B) {
seriesCounts := []int{100, 1000, 10000}
series := genSeries(10000, 10, 0, 0) // Only using the generated labels.
appendSamples := func(b *testing.B, app storage.Appender, seriesCount int, ts int64) {
var err error
for i, s := range series[:seriesCount] {
var ref storage.SeriesRef
// if i is even, append a sample, else append a histogram.
if i%2 == 0 {
ref, err = app.Append(ref, s.Labels(), ts, float64(ts))
} else {
h := &histogram.Histogram{
Count: 7 + uint64(ts*5),
ZeroCount: 2 + uint64(ts),
ZeroThreshold: 0.001,
Sum: 18.4 * rand.Float64(),
Schema: 1,
PositiveSpans: []histogram.Span{
{Offset: 0, Length: 2},
{Offset: 1, Length: 2},
},
PositiveBuckets: []int64{ts + 1, 1, -1, 0},
}
ref, err = app.AppendHistogram(ref, s.Labels(), ts, h, nil)
}
require.NoError(b, err)
_, err = app.AppendExemplar(ref, s.Labels(), exemplar.Exemplar{
Labels: labels.FromStrings("trace_id", strconv.Itoa(rand.Int())),
Value: rand.Float64(),
Ts: ts,
})
require.NoError(b, err)
}
}
for _, seriesCount := range seriesCounts {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, commits := range []int64{1, 2} { // To test commits that create new series and when the series already exists.
b.Run(fmt.Sprintf("%d commits", commits), func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
b.StopTimer()
h, w := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() {
if h != nil {
h.Close()
}
if w != nil {
w.Close()
}
})
app := h.Appender(context.Background())
appendSamples(b, app, seriesCount, 0)
b.StartTimer()
require.NoError(b, app.Commit())
if commits == 2 {
b.StopTimer()
app = h.Appender(context.Background())
appendSamples(b, app, seriesCount, 1)
b.StartTimer()
require.NoError(b, app.Commit())
}
b.StopTimer()
h.Close()
h = nil
w.Close()
w = nil
}
})
}
})
}
}
type failingSeriesLifecycleCallback struct{}
func (failingSeriesLifecycleCallback) PreCreation(labels.Labels) error { return errors.New("failed") }

View File

@ -107,49 +107,6 @@ func BenchmarkCreateSeries(b *testing.B) {
}
}
func BenchmarkHeadAppender_Append_Commit_ExistingSeries(b *testing.B) {
seriesCounts := []int{100, 1000, 10000}
series := genSeries(10000, 10, 0, 0)
for _, seriesCount := range seriesCounts {
b.Run(fmt.Sprintf("%d series", seriesCount), func(b *testing.B) {
for _, samplesPerAppend := range []int64{1, 2, 5, 100} {
b.Run(fmt.Sprintf("%d samples per append", samplesPerAppend), func(b *testing.B) {
h, _ := newTestHead(b, 10000, compression.None, false)
b.Cleanup(func() { require.NoError(b, h.Close()) })
ts := int64(1000)
appendSamples := func() error {
var err error
app := h.Appender(context.Background())
for _, s := range series[:seriesCount] {
var ref storage.SeriesRef
for sampleIndex := range samplesPerAppend {
ref, err = app.Append(ref, s.Labels(), ts+sampleIndex, float64(ts+sampleIndex))
if err != nil {
return err
}
}
}
ts += 1000 // should increment more than highest samplesPerAppend
return app.Commit()
}
// Init series, that's not what we're benchmarking here.
require.NoError(b, appendSamples())
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
require.NoError(b, appendSamples())
}
})
}
})
}
}
func populateTestWL(t testing.TB, w *wlog.WL, recs []any, buf []byte) []byte {
var enc record.Encoder
for _, r := range recs {
@ -5941,7 +5898,7 @@ func TestOOOAppendWithNoSeries(t *testing.T) {
}
}
func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) {
func testOOOAppendWithNoSeries(t *testing.T, appendFunc func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compression.Snappy)
require.NoError(t, err)
@ -6284,6 +6241,7 @@ func TestSnapshotAheadOfWALError(t *testing.T) {
require.NoError(t, head.Close())
}
// TODO(bwplotka): Bad benchmark (no b.Loop/b.N), fix or remove.
func BenchmarkCuttingHeadHistogramChunks(b *testing.B) {
const (
numSamples = 50000
@ -6579,6 +6537,8 @@ func TestWALSampleAndExemplarOrder(t *testing.T) {
// would trigger the
// `signal SIGSEGV: segmentation violation code=0x1 addr=0x20 pc=0xbb03d1`
// panic, that we have seen in the wild once.
//
// TODO(bwplotka): This no longer can happen in AppenderV2, remove once AppenderV1 is removed, see #17632.
func TestHeadCompactionWhileAppendAndCommitExemplar(t *testing.T) {
h, _ := newTestHead(t, DefaultBlockDuration, compression.None, false)
app := h.Appender(context.Background())

View File

@ -44,14 +44,14 @@ type testValue struct {
type sampleTypeScenario struct {
sampleType string
appendFunc func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)
appendFunc func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error)
sampleFunc func(ts, value int64) sample
}
var sampleTypeScenarios = map[string]sampleTypeScenario{
float: {
sampleType: sampleMetricTypeFloat,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, f: float64(value)}
ref, err := appender.Append(0, lbls, ts, s.f)
return ref, s, err
@ -62,7 +62,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
},
intHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err
@ -73,7 +73,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
},
floatHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestFloatHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err
@ -84,7 +84,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
},
customBucketsIntHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestCustomBucketsHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err
@ -95,7 +95,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
},
customBucketsFloatHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestCustomBucketsFloatHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err
@ -106,7 +106,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
},
gaugeIntHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, h: tsdbutil.GenerateTestGaugeHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, s.h, nil)
return ref, s, err
@ -117,7 +117,7 @@ var sampleTypeScenarios = map[string]sampleTypeScenario{
},
gaugeFloatHistogram: {
sampleType: sampleMetricTypeHistogram,
appendFunc: func(appender storage.Appender, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
appendFunc: func(appender storage.LimitedAppenderV1, lbls labels.Labels, ts, value int64) (storage.SeriesRef, sample, error) {
s := sample{t: ts, fh: tsdbutil.GenerateTestGaugeFloatHistogram(value)}
ref, err := appender.AppendHistogram(0, lbls, ts, nil, s.fh)
return ref, s, err