From 934bb2dbc69ed209e8954eaefe988e5d2aa0b6f6 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Thu, 23 Nov 2023 19:25:00 -0800 Subject: [PATCH] wip, rework compression testing after format PR was merged Signed-off-by: Callum Styan --- cmd/prometheus/main.go | 8 +- storage/remote/codec.go | 84 ++++----- storage/remote/codec_test.go | 24 +-- storage/remote/compression.go | 124 ++++++++----- storage/remote/compression_test.go | 118 ++++++------ storage/remote/queue_manager.go | 129 +++----------- storage/remote/queue_manager_test.go | 258 +++++++++++++++------------ storage/remote/read_test.go | 2 +- storage/remote/storage.go | 4 +- storage/remote/storage_test.go | 8 +- storage/remote/write.go | 5 +- storage/remote/write_handler.go | 15 +- storage/remote/write_handler_test.go | 135 +++++++------- storage/remote/write_test.go | 14 +- web/api/v1/api.go | 3 +- web/web.go | 2 + 16 files changed, 474 insertions(+), 459 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a5a0a176ed..43c18b464b 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -155,7 +155,8 @@ type flagConfig struct { enablePerStepStats bool enableAutoGOMAXPROCS bool // todo: how to use the enable feature flag properly + use the remote format enum type - rwFormat int + rwFormat int + rwCompression int prometheusURL string corsRegexString string @@ -428,6 +429,9 @@ func main() { a.Flag("remote-write-format", "remote write proto format to use, valid options: 0 (1.0), 1 (reduced format), 3 (min64 format)"). Default("0").IntVar(&cfg.rwFormat) + a.Flag("remote-write-compression", "remote write compression to use, valid options: 0 (snappy), 1 (zstd), 3 (flate)"). + Default("0").IntVar(&cfg.rwCompression) + promlogflag.AddFlags(a, &cfg.promlogConfig) a.Flag("write-documentation", "Generate command line documentation. Internal use.").Hidden().Action(func(ctx *kingpin.ParseContext) error { @@ -600,7 +604,7 @@ func main() { var ( localStorage = &readyStorage{stats: tsdb.NewDBStats()} scraper = &readyScrapeManager{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, remote.RemoteWriteFormat(cfg.rwFormat)) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, localStoragePath, time.Duration(cfg.RemoteFlushDeadline), scraper, remote.RemoteWriteFormat(cfg.rwFormat), remote.RemoteWriteCompression(cfg.rwCompression)) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index f2229e7a21..7e322a26c7 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -861,14 +861,14 @@ func metricTypeToMetricTypeProto(t textparse.MetricType) prompb.MetricMetadata_M // DecodeWriteRequest from an io.Reader into a prompb.WriteRequest, handling // snappy decompression. -func DecodeWriteRequest(r io.Reader) (*prompb.WriteRequest, error) { +func DecodeWriteRequest(r io.Reader, c *Compressor) (*prompb.WriteRequest, error) { compressed, err := io.ReadAll(r) if err != nil { return nil, err } - comp := GetPooledComp() - defer PutPooledComp(comp) + comp := c.GetPooledCompressor() + defer c.PutPooledCompressor(comp) reqBuf, err := comp.Decompress(compressed) if err != nil { @@ -938,45 +938,45 @@ func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error) // DecodeMinimizedWriteRequest from an io.Reader into a prompb.WriteRequest, handling // snappy decompression. -func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) { - compressed, err := io.ReadAll(r) - if err != nil { - return nil, err - } - - comp := GetPooledComp() - defer PutPooledComp(comp) - reqBuf, err := comp.Decompress(compressed) - if err != nil { - return nil, err - } - - var req prompb.MinimizedWriteRequest - if err := proto.Unmarshal(reqBuf, &req); err != nil { - return nil, err - } - - return &req, nil -} - -func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestLen, error) { - compressed, err := io.ReadAll(r) - if err != nil { - return nil, err - } - - reqBuf, err := snappy.Decode(nil, compressed) - if err != nil { - return nil, err - } - - var req prompb.MinimizedWriteRequestLen - if err := proto.Unmarshal(reqBuf, &req); err != nil { - return nil, err - } - - return &req, nil -} +//func DecodeMinimizedWriteRequest(r io.Reader) (*prompb.MinimizedWriteRequest, error) { +// compressed, err := io.ReadAll(r) +// if err != nil { +// return nil, err +// } +// +// comp := c.GetPooledCompressor() +// defer c.PutPooledCompressor(comp) +// reqBuf, err := comp.Decompress(compressed) +// if err != nil { +// return nil, err +// } +// +// var req prompb.MinimizedWriteRequest +// if err := proto.Unmarshal(reqBuf, &req); err != nil { +// return nil, err +// } +// +// return &req, nil +//} +// +//func DecodeMinimizedWriteRequestLen(r io.Reader) (*prompb.MinimizedWriteRequestLen, error) { +// compressed, err := io.ReadAll(r) +// if err != nil { +// return nil, err +// } +// +// reqBuf, err := snappy.Decode(nil, compressed) +// if err != nil { +// return nil, err +// } +// +// var req prompb.MinimizedWriteRequestLen +// if err := proto.Unmarshal(reqBuf, &req); err != nil { +// return nil, err +// } +// +// return &req, nil +//} func MinimizedWriteRequestToWriteRequest(redReq *prompb.MinimizedWriteRequest) (*prompb.WriteRequest, error) { req := &prompb.WriteRequest{ diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index 03ab16aac0..6550c199b8 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -550,23 +550,25 @@ func TestMetricTypeToMetricTypeProto(t *testing.T) { } func TestDecodeWriteRequest(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) + c := NewCompressor(Snappy) + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, &c) require.NoError(t, err) - actual, err := DecodeWriteRequest(bytes.NewReader(buf)) + actual, err := DecodeWriteRequest(bytes.NewReader(buf), &c) require.NoError(t, err) require.Equal(t, writeRequestFixture, actual) } -func TestDecodeMinWriteRequest(t *testing.T) { - buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil) - - require.NoError(t, err) - - actual, err := DecodeMinimizedWriteRequest(bytes.NewReader(buf)) - require.NoError(t, err) - require.Equal(t, writeRequestMinimizedFixture, actual) -} +//func TestDecodeMinWriteRequest(t *testing.T) { +// c := NewCompressor(Snappy) +// buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil) +// +// require.NoError(t, err) +// +// actual, err := DecodeMinimizedWriteRequest(bytes.NewReader(buf)) +// require.NoError(t, err) +// require.Equal(t, writeRequestMinimizedFixture, actual) +//} func TestNilHistogramProto(t *testing.T) { // This function will panic if it impromperly handles nil diff --git a/storage/remote/compression.go b/storage/remote/compression.go index 184d4a6e68..c83d3cb930 100644 --- a/storage/remote/compression.go +++ b/storage/remote/compression.go @@ -3,6 +3,7 @@ package remote import ( "bytes" "compress/lzw" + "fmt" "io" "sync" @@ -24,67 +25,87 @@ type Compression interface { // hacky globals to easily tweak the compression algorithm and run some benchmarks type CompAlgorithm int -var UseAlgorithm = Snappy +//var UseAlgorithm = Snappy -const ( - Snappy CompAlgorithm = iota - SnappyAlt - S2 - ZstdFast - ZstdDefault - ZstdBestComp - Lzw - FlateFast - FlateDefault - FlateComp - BrotliFast - BrotliComp - BrotliDefault -) +//const ( +// Snappy CompAlgorithm = iota +// SnappyAlt +// S2 +// ZstdFast +// ZstdDefault +// ZstdBestComp +// Lzw +// FlateFast +// FlateDefault +// FlateComp +// BrotliFast +// BrotliComp +// BrotliDefault +//) // sync.Pool-ed createComp -var compPool = sync.Pool{ - // New optionally specifies a function to generate - // a value when Get would otherwise return nil. - New: func() interface{} { return createComp() }, +//var compPool = sync.Pool{ +// // New optionally specifies a function to generate +// // a value when Get would otherwise return nil. +// New: func() interface{} { return createComp() }, +//} + +type Compressor struct { + Compression + pool sync.Pool } -func GetPooledComp() Compression { - return compPool.Get().(Compression) +func NewCompressor(compType RemoteWriteCompression) Compressor { + var c Compressor + c.Compression = createComp(compType) + c.pool = sync.Pool{ + // New optionally specifies a function to generate + // a value when Get would otherwise return nil. + New: func() interface{} { return createComp(compType) }, + } + return c } -func PutPooledComp(c Compression) { - compPool.Put(c) +func (comp Compressor) GetPooledCompressor() Compression { + return comp.pool.Get().(Compression) } -var createComp func() Compression = func() Compression { - switch UseAlgorithm { +func (comp Compressor) PutPooledCompressor(c Compression) { + comp.pool.Put(c) +} + +func createComp(comp RemoteWriteCompression) Compression { + switch comp { case Snappy: return &snappyCompression{} - case SnappyAlt: - return &snappyAltCompression{} - case S2: - return &s2Compression{} - case ZstdDefault: + case Zstd: return &zstdCompression{level: zstd.SpeedDefault} - case ZstdFast: - return &zstdCompression{level: zstd.SpeedFastest} - case ZstdBestComp: - return &zstdCompression{level: zstd.SpeedBestCompression} - case Lzw: - return &lzwCompression{} - case FlateFast: - return &flateCompression{level: flate.BestSpeed} - case FlateComp: - return &flateCompression{level: flate.BestCompression} - case FlateDefault: + case Flate: return &flateCompression{level: flate.DefaultCompression} - case BrotliFast: - return &brotliCompression{quality: brotli.BestSpeed} - case BrotliDefault: - return &brotliCompression{quality: brotli.DefaultCompression} - case BrotliComp: - return &brotliCompression{quality: brotli.BestCompression} + //case SnappyAlt: + // return &snappyAltCompression{} + //case S2: + // return &s2Compression{} + //case ZstdDefault: + // return &zstdCompression{level: zstd.SpeedDefault} + //case ZstdFast: + // return &zstdCompression{level: zstd.SpeedFastest} + //case ZstdBestComp: + // return &zstdCompression{level: zstd.SpeedBestCompression} + //case Lzw: + // return &lzwCompression{} + //case FlateFast: + // return &flateCompression{level: flate.BestSpeed} + //case FlateComp: + // return &flateCompression{level: flate.BestCompression} + //case FlateDefault: + // return &flateCompression{level: flate.DefaultCompression} + //case BrotliFast: + // return &brotliCompression{quality: brotli.BestSpeed} + //case BrotliDefault: + // return &brotliCompression{quality: brotli.DefaultCompression} + //case BrotliComp: + // return &brotliCompression{quality: brotli.BestCompression} default: panic("unknown compression algorithm") } @@ -110,10 +131,17 @@ func (s *snappyCompression) Compress(data []byte) ([]byte, error) { if n := snappy.MaxEncodedLen(len(data)); n > cap(s.buf) { s.buf = make([]byte, n) } + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + fmt.Println("error uncompressing immediately after compressing") + } + fmt.Println("len uncompressed: ", len(uncompressed)) + fmt.Println("returning snappy compressed data") return compressed, nil } func (s *snappyCompression) Decompress(data []byte) ([]byte, error) { s.buf = s.buf[0:cap(s.buf)] + fmt.Println("len data: ", len(data)) uncompressed, err := snappy.Decode(s.buf, data) if len(uncompressed) > cap(s.buf) { s.buf = uncompressed diff --git a/storage/remote/compression_test.go b/storage/remote/compression_test.go index 1e39fb25aa..4c8be37c4b 100644 --- a/storage/remote/compression_test.go +++ b/storage/remote/compression_test.go @@ -1,35 +1,40 @@ package remote import ( + "github.com/stretchr/testify/require" "os" "testing" "time" ) func TestCompressions(t *testing.T) { - data := makeUncompressedReducedWriteRequestBenchData(t) + data, err := makeUncompressedWriteRequestBenchData() + require.NoError(t, err) tc := []struct { name string - algo CompAlgorithm + algo RemoteWriteCompression }{ {"Snappy", Snappy}, - {"SnappyAlt", SnappyAlt}, - {"S2", S2}, - {"ZstdFast", ZstdFast}, - {"ZstdDefault", ZstdDefault}, - {"ZstdBestComp", ZstdBestComp}, - {"Lzw", Lzw}, - {"FlateFast", FlateFast}, - {"FlateComp", FlateComp}, - {"BrotliFast", BrotliFast}, - {"BrotliComp", BrotliComp}, - {"BrotliDefault", BrotliDefault}, + {"Zstd", Zstd}, + {"Flate", Flate}, + //{"SnappyAlt", SnappyAlt}, + //{"S2", S2}, + //{"ZstdFast", ZstdFast}, + //{"ZstdDefault", ZstdDefault}, + //{"ZstdBestComp", ZstdBestComp}, + //{"Lzw", Lzw}, + //{"FlateFast", FlateFast}, + //{"FlateComp", FlateComp}, + //{"BrotliFast", BrotliFast}, + //{"BrotliComp", BrotliComp}, + //{"BrotliDefault", BrotliDefault}, } for _, c := range tc { t.Run(c.name, func(t *testing.T) { - UseAlgorithm = c.algo - comp := createComp() + //UseAlgorithm = c.algo + //comp := createComp() + comp := NewCompressor(c.algo) compressed, err := comp.Compress(data) if err != nil { t.Fatal(err) @@ -49,61 +54,64 @@ func TestCompressions(t *testing.T) { func BenchmarkCompressions_V1(b *testing.B) { // Synthetic data, attempts to be representative - data := makeUncompressedWriteRequestBenchData(b) + data, err := makeUncompressedWriteRequestBenchData() + require.NoError(b, err) benchmarkCompressionsForData(b, [][]byte{data}) } -func BenchmarkCompressions_V11(b *testing.B) { - // Synthetic data, attempts to be representative - data := makeUncompressedWriteRequestBenchData(b) - benchmarkCompressionsForData(b, [][]byte{data}) -} +//func BenchmarkCompressions_V11(b *testing.B) { +// // Synthetic data, attempts to be representative +// data, err := makeUncompressedWriteRequestBenchData() +// require.NoError(b, err) +// benchmarkCompressionsForData(b, [][]byte{data}) +//} // Needs the dataset to be present in /home/nicolas/rw11data/v11_raw/ -func BenchmarkCompressions_V11_FileDataSet(b *testing.B) { - datas := readAllFiles("/home/nicolas/rw11data/v11_raw/") - if len(datas) != 10 { - b.Fatal("unexpected number of files") - } - benchmarkCompressionsForData(b, datas) -} +//func BenchmarkCompressions_V11_FileDataSet(b *testing.B) { +// datas := readAllFiles("/home/nicolas/rw11data/v11_raw/") +// if len(datas) != 10 { +// b.Fatal("unexpected number of files") +// } +// benchmarkCompressionsForData(b, datas) +//} // Needs the dataset to be present in /home/nicolas/rw11data/v1_raw/ -func BenchmarkCompressions_V1_FileDataSet(b *testing.B) { - datas := readAllFiles("/home/nicolas/rw11data/v1_raw/") - if len(datas) != 10 { - b.Fatal("unexpected number of files") - } - benchmarkCompressionsForData(b, datas) -} +//func BenchmarkCompressions_V1_FileDataSet(b *testing.B) { +// datas := readAllFiles("/home/nicolas/rw11data/v1_raw/") +// if len(datas) != 10 { +// b.Fatal("unexpected number of files") +// } +// benchmarkCompressionsForData(b, datas) +//} func benchmarkCompressionsForData(b *testing.B, datas [][]byte) { bc := []struct { name string - algo CompAlgorithm + algo RemoteWriteCompression }{ {"Snappy", Snappy}, - {"SnappyAlt", SnappyAlt}, - {"S2", S2}, - {"ZstdFast", ZstdFast}, - {"ZstdDefault", ZstdDefault}, - {"ZstdBestComp", ZstdBestComp}, - {"Lzw", Lzw}, - {"FlateFast", FlateFast}, - {"FlateDefault", FlateDefault}, - {"FlateComp", FlateComp}, - {"BrotliFast", BrotliFast}, - {"BrotliDefault", BrotliDefault}, - // {"BrotliComp", BrotliComp}, + {"Zstd", Zstd}, + {"Flate", Flate}, + //{"SnappyAlt", SnappyAlt}, + //{"S2", S2}, + //{"ZstdFast", ZstdFast}, + //{"ZstdDefault", ZstdDefault}, + //{"ZstdBestComp", ZstdBestComp}, + //{"Lzw", Lzw}, + //{"FlateFast", FlateFast}, + //{"FlateComp", FlateComp}, + //{"BrotliFast", BrotliFast}, + //{"BrotliComp", BrotliComp}, + //{"BrotliDefault", BrotliDefault}, } - comps := make(map[CompAlgorithm]Compression) - decomps := make(map[CompAlgorithm]Compression) + comps := make(map[RemoteWriteCompression]*Compressor) + decomps := make(map[RemoteWriteCompression]*Compressor) for _, c := range bc { - UseAlgorithm = c.algo - comp := createComp() - decomp := createComp() - comps[c.algo] = comp - decomps[c.algo] = decomp + comp := NewCompressor(c.algo) + decomp := NewCompressor(c.algo) + + comps[c.algo] = &comp + decomps[c.algo] = &decomp // warmup for i := 0; i < 2; i++ { for _, data := range datas { diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index c331fa1707..25253f8dc3 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -397,6 +397,14 @@ const ( MinLen // symbols are now just offsets, and we encode lengths as varints in the large symbols string (which is also now a byte slice) ) +type RemoteWriteCompression int64 + +const ( + Snappy RemoteWriteCompression = iota + Zstd + Flate +) + // QueueManager manages a queue of samples to be sent to the Storage // indicated by the provided WriteClient. Implements writeTo interface // used by WAL Watcher. @@ -426,6 +434,7 @@ type QueueManager struct { seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentIndexes map[chunks.HeadSeriesRef]int + compressor Compressor shards *shards numShards int reshardChan chan int @@ -463,6 +472,7 @@ func NewQueueManager( enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, rwFormat RemoteWriteFormat, + rwComp RemoteWriteCompression, ) *QueueManager { if logger == nil { logger = log.NewNopLogger() @@ -487,7 +497,8 @@ func NewQueueManager( sendNativeHistograms: enableNativeHistogramRemoteWrite, // TODO: we should eventually set the format via content negotiation, // so this field would be the desired format, maybe with a fallback? - rwFormat: rwFormat, + rwFormat: rwFormat, + compressor: NewCompressor(rwComp), seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), @@ -545,8 +556,7 @@ func (t *QueueManager) AppendMetadata(ctx context.Context, metadata []scrape.Met func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []prompb.MetricMetadata, pBuf *proto.Buffer) error { // Build the WriteRequest with no samples. - comp := createComp() - req, _, err := buildWriteRequest(nil, metadata, pBuf, comp) + req, _, err := buildWriteRequest(nil, metadata, pBuf, t.compressor.GetPooledCompressor()) if err != nil { return err } @@ -1368,7 +1378,7 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { pBuf = proto.NewBuffer(nil) pBufRaw []byte buf []byte - comp = createComp() + comp = s.qm.compressor.GetPooledCompressor() ) if s.qm.sendExemplars { max += int(float64(max) * 0.1) @@ -1432,16 +1442,16 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { case Base1: nPendingSamples, nPendingExemplars, nPendingHistograms := populateTimeSeries(batch, pendingData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendSamples(ctx, pendingData[:n], nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp) case Min32Optimized: nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeries(&symbolTable, batch, pendingMinimizedData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) + s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf, comp) symbolTable.clear() case MinLen: nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms - s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp) symbolTable.clear() } queue.ReturnForReuse(batch) @@ -1464,14 +1474,14 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue *queue) { n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) - s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf) + s.sendMinSamples(ctx, pendingMinimizedData[:n], symbolTable.LabelsString(), nPendingSamples, nPendingExemplars, nPendingHistograms, &pBufRaw, &buf, comp) symbolTable.clear() case MinLen: nPendingSamples, nPendingExemplars, nPendingHistograms := populateMinimizedTimeSeriesLen(&symbolTable, batch, pendingMinLenData, s.qm.sendExemplars, s.qm.sendNativeHistograms) n := nPendingSamples + nPendingExemplars + nPendingHistograms level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending buffered data", "samples", nPendingSamples, "exemplars", nPendingExemplars, "shard", shardNum, "histograms", nPendingHistograms) - s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, &buf) + s.sendMinLenSamples(ctx, pendingMinLenData[:n], symbolTable.LabelsData(), nPendingSamples, nPendingExemplars, nPendingHistograms, pBuf, comp) symbolTable.clear() } } @@ -1532,24 +1542,24 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, s s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) } -func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte) { +func (s *shards) sendMinSamples(ctx context.Context, samples []prompb.MinimizedTimeSeries, labels string, sampleCount, exemplarCount, histogramCount int, pBuf, buf *[]byte, comp Compression) { begin := time.Now() // Build the ReducedWriteRequest with no metadata. // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. - req, highest, err := buildMinimizedWriteRequest(samples, labels, pBuf, buf) + req, highest, err := buildMinimizedWriteRequest(samples, labels, pBuf, buf, comp) if err == nil { err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) } s.updateMetrics(ctx, err, sampleCount, exemplarCount, histogramCount, time.Since(begin)) } -func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLen, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, buf *[]byte) { +func (s *shards) sendMinLenSamples(ctx context.Context, samples []prompb.MinimizedTimeSeriesLen, labels []byte, sampleCount, exemplarCount, histogramCount int, pBuf *proto.Buffer, comp Compression) { begin := time.Now() // Build the ReducedWriteRequest with no metadata. // Failing to build the write request is non-recoverable, since it will // only error if marshaling the proto to bytes fails. - req, highest, err := buildMinimizedWriteRequestLen(samples, labels, pBuf, buf) + req, highest, err := buildMinimizedWriteRequestLen(samples, labels, pBuf, comp) if err == nil { err = s.sendSamplesWithBackoff(ctx, req, sampleCount, exemplarCount, histogramCount, highest) } @@ -1772,11 +1782,7 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta return buildWriteRequestWithCompression(samples, metadata, pBuf, comp) } -func buildWriteRequestWithCompression(samples []prompb.TimeSeries, - metadata []prompb.MetricMetadata, - pBuf *proto.Buffer, - compressor Compression, -) ([]byte, int64, error) { +func buildWriteRequestWithCompression(samples []prompb.TimeSeries, metadata []prompb.MetricMetadata, pBuf *proto.Buffer, compressor Compression) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -1806,18 +1812,7 @@ func buildWriteRequestWithCompression(samples []prompb.TimeSeries, return nil, highest, err } - // snappy uses len() to see if it needs to allocate a new slice. Make the - // buffer as long as possible. - if buf != nil { - *buf = (*buf)[0:cap(*buf)] - } else { - buf = &[]byte{} - } - compressed := snappy.Encode(*buf, pBuf.Bytes()) - if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) { - // grow the buffer for the next time - *buf = make([]byte, n) - } + compressed, err := compressor.Compress(pBuf.Bytes()) return compressed, highest, err } @@ -1826,48 +1821,6 @@ type offLenPair struct { Len uint32 } -func buildReducedWriteRequest(samples []prompb.ReducedTimeSeries, labels map[uint64]string, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) { - return buildReducedWriteRequestWithCompression(samples, labels, pBuf, comp) -} - -func buildReducedWriteRequestWithCompression(samples []prompb.ReducedTimeSeries, - labels map[uint64]string, - pBuf *proto.Buffer, - compress Compression, -) ([]byte, int64, error) { - var highest int64 - for _, ts := range samples { - // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. - if len(ts.Samples) > 0 && ts.Samples[0].Timestamp > highest { - highest = ts.Samples[0].Timestamp - } - if len(ts.Exemplars) > 0 && ts.Exemplars[0].Timestamp > highest { - highest = ts.Exemplars[0].Timestamp - } - if len(ts.Histograms) > 0 && ts.Histograms[0].Timestamp > highest { - highest = ts.Histograms[0].Timestamp - } - } - - req := &prompb.WriteRequestWithRefs{ - StringSymbolTable: labels, - Timeseries: samples, - } - - if pBuf == nil { - pBuf = proto.NewBuffer(nil) // For convenience in tests. Not efficient. - } else { - pBuf.Reset() - } - err := pBuf.Marshal(req) - if err != nil { - return nil, 0, err - } - - compressed, err := compress.Compress(pBuf.Bytes()) - return compressed, highest, err -} - type rwSymbolTable struct { symbols []byte symbolsMap map[string]offLenPair @@ -1926,7 +1879,7 @@ func (r *rwSymbolTable) clear() { r.symbols = r.symbols[:0] } -func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf, buf *[]byte) ([]byte, int64, error) { +func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels string, pBuf, buf *[]byte, comp Compression) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -1955,23 +1908,11 @@ func buildMinimizedWriteRequest(samples []prompb.MinimizedTimeSeries, labels str } *pBuf = data - // snappy uses len() to see if it needs to allocate a new slice. Make the - // buffer as long as possible. - if buf != nil { - *buf = (*buf)[0:cap(*buf)] - } else { - buf = &[]byte{} - } - - compressed := snappy.Encode(*buf, data) - if n := snappy.MaxEncodedLen(len(data)); n > len(*buf) { - // grow the buffer for the next time - *buf = make([]byte, n) - } + compressed, err := comp.Compress(*pBuf) return compressed, highest, nil } -func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labels []byte, pBuf *proto.Buffer, buf *[]byte) ([]byte, int64, error) { +func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labels []byte, pBuf *proto.Buffer, comp Compression) ([]byte, int64, error) { var highest int64 for _, ts := range samples { // At the moment we only ever append a TimeSeries with a single sample or exemplar in it. @@ -2001,18 +1942,6 @@ func buildMinimizedWriteRequestLen(samples []prompb.MinimizedTimeSeriesLen, labe return nil, 0, err } - // snappy uses len() to see if it needs to allocate a new slice. Make the - // buffer as long as possible. - if buf != nil { - *buf = (*buf)[0:cap(*buf)] - } else { - buf = &[]byte{} - } - - compressed := snappy.Encode(*buf, pBuf.Bytes()) - if n := snappy.MaxEncodedLen(len(pBuf.Bytes())); n > len(*buf) { - // grow the buffer for the next time - *buf = make([]byte, n) - } + compressed, err := comp.Compress(pBuf.Bytes()) return compressed, highest, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index b9c8acd13a..b6b45abb77 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -15,7 +15,6 @@ package remote import ( "bytes" - "compress/flate" "context" "fmt" "math" @@ -32,7 +31,6 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" - "github.com/klauspost/compress/zstd" "github.com/prometheus/client_golang/prometheus" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" @@ -71,13 +69,14 @@ func TestSampleDelivery(t *testing.T) { histograms bool floatHistograms bool rwFormat RemoteWriteFormat + rwComp RemoteWriteCompression }{ - {samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"}, - {samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"}, - {samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"}, - {samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, - {samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, - + //{samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"}, + //{samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"}, + //{samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"}, + //{samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"}, + //{samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"}, + // {rwFormat: Min32Optimized, samples: true, exemplars: false, histograms: false, name: "interned samples only"}, {rwFormat: Min32Optimized, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "interned samples, exemplars, and histograms"}, {rwFormat: Min32Optimized, samples: false, exemplars: true, histograms: false, name: "interned exemplars only"}, @@ -109,7 +108,7 @@ func TestSampleDelivery(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { dir := t.TempDir() - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.rwFormat) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, tc.rwFormat, tc.rwComp) defer s.Close() var ( @@ -148,6 +147,7 @@ func TestSampleDelivery(t *testing.T) { qm.StoreSeries(series, 0) // Send first half of data. + fmt.Println("Waiting for first half of data") c.expectSamples(samples[:len(samples)/2], series) c.expectExemplars(exemplars[:len(exemplars)/2], series) c.expectHistograms(histograms[:len(histograms)/2], series) @@ -157,6 +157,8 @@ func TestSampleDelivery(t *testing.T) { qm.AppendHistograms(histograms[:len(histograms)/2]) qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2]) c.waitForExpectedData(t) + fmt.Println("got all of first half of data") + fmt.Println("Waiting for second half of data") // Send second half of data. c.expectSamples(samples[len(samples)/2:], series) @@ -168,6 +170,7 @@ func TestSampleDelivery(t *testing.T) { qm.AppendHistograms(histograms[len(histograms)/2:]) qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:]) c.waitForExpectedData(t) + fmt.Println("got all of second half of data") }) } } @@ -181,7 +184,25 @@ func TestMetadataDelivery(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, 0) + m := NewQueueManager(metrics, + nil, + nil, + nil, + dir, + newEWMARate(ewmaWeight, shardUpdateDuration), + cfg, + mcfg, + labels.EmptyLabels(), + nil, + c, + defaultFlushDeadline, + newPool(), + newHighestTimestampMetric(), + nil, + false, + false, + 0, + 0) m.Start() defer m.Stop() @@ -222,7 +243,7 @@ func TestSampleDeliveryTimeout(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, 0) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -268,7 +289,7 @@ func TestSampleDeliveryOrder(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, 0) m.StoreSeries(series, 0) m.Start() @@ -290,7 +311,7 @@ func TestShutdown(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy) n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend samples, series := createTimeseries(n, n) m.StoreSeries(series, 0) @@ -328,7 +349,7 @@ func TestSeriesReset(t *testing.T) { cfg := config.DefaultQueueConfig mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, deadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy) for i := 0; i < numSegments; i++ { series := []record.RefSeries{} for j := 0; j < numSeries; j++ { @@ -341,6 +362,7 @@ func TestSeriesReset(t *testing.T) { require.Equal(t, numSegments*numSeries/2, len(m.seriesLabels)) } +// doesn't end func TestReshard(t *testing.T) { for _, rwFormat := range []RemoteWriteFormat{Base1, Min32Optimized} { t.Run(fmt.Sprint(rwFormat), func(t *testing.T) { @@ -359,7 +381,7 @@ func TestReshard(t *testing.T) { dir := t.TempDir() metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy) m.StoreSeries(series, 0) m.Start() @@ -399,7 +421,7 @@ func TestReshardRaceWithStop(t *testing.T) { go func() { for { metrics := newQueueManagerMetrics(nil, "", "") - m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m = NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy) m.Start() h.Unlock() h.Lock() @@ -438,7 +460,7 @@ func TestReshardPartialBatch(t *testing.T) { cfg.BatchSendDeadline = model.Duration(batchSendDeadline) metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy) m.StoreSeries(series, 0) m.Start() @@ -487,7 +509,7 @@ func TestQueueFilledDeadlock(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") - m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, t.TempDir(), newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, flushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy) m.StoreSeries(series, 0) m.Start() defer m.Stop() @@ -518,7 +540,7 @@ func TestReleaseNoninternedString(t *testing.T) { mcfg := config.DefaultMetadataConfig metrics := newQueueManagerMetrics(nil, "", "") c := NewTestWriteClient(rwFormat) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, rwFormat, Snappy) m.Start() defer m.Stop() @@ -568,7 +590,7 @@ func TestShouldReshard(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") // todo: test with new proto type(s) client := NewTestWriteClient(Base1) - m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy) m.numShards = c.startingShards m.dataIn.incr(c.samplesIn) m.dataOut.incr(c.samplesOut) @@ -966,7 +988,7 @@ func BenchmarkSampleSend(b *testing.B) { metrics := newQueueManagerMetrics(nil, "", "") // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy) m.StoreSeries(series, 0) // These should be received by the client. @@ -1013,7 +1035,7 @@ func BenchmarkStartup(b *testing.B) { // todo: test with new proto type(s) m := NewQueueManager(metrics, nil, nil, logger, dir, newEWMARate(ewmaWeight, shardUpdateDuration), - cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + cfg, mcfg, labels.EmptyLabels(), nil, c, 1*time.Minute, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy) m.watcher.SetStartTime(timestamp.Time(math.MaxInt64)) m.watcher.MaxSegment = segments[len(segments)-2] err := m.watcher.Run() @@ -1097,7 +1119,7 @@ func TestCalculateDesiredShards(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy) // Need to start the queue manager so the proper metrics are initialized. // However we can stop it right away since we don't need to do any actual @@ -1175,7 +1197,7 @@ func TestCalculateDesiredShardsDetail(t *testing.T) { metrics := newQueueManagerMetrics(nil, "", "") samplesIn := newEWMARate(ewmaWeight, shardUpdateDuration) // todo: test with new proto type(s) - m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1) + m := NewQueueManager(metrics, nil, nil, nil, dir, samplesIn, cfg, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Base1, Snappy) for _, tc := range []struct { name string @@ -1495,67 +1517,67 @@ func BenchmarkBuildWriteRequest(b *testing.B) { }) } -func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { +//func BenchmarkBuildMinimizedWriteRequest(b *testing.B) { +// +// type testcase struct { +// batch []timeSeries +// } +// testCases := []testcase{ +// {createDummyTimeSeries(2)}, +// {createDummyTimeSeries(10)}, +// {createDummyTimeSeries(100)}, +// } +// for _, tc := range testCases { +// symbolTable := newRwSymbolTable() +// seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch)) +// for i := range seriesBuff { +// seriesBuff[i].Samples = []prompb.Sample{{}} +// seriesBuff[i].Exemplars = []prompb.Exemplar{{}} +// } +// pBuf := []byte{} +// +// // Warmup buffers +// for i := 0; i < 10; i++ { +// populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) +// buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) +// } +// +// b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { +// totalSize := 0 +// for j := 0; j < b.N; j++ { +// populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) +// b.ResetTimer() +// req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) +// if err != nil { +// b.Fatal(err) +// } +// symbolTable.clear() +// totalSize += len(req) +// b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") +// } +// }) +// } +//} - type testcase struct { - batch []timeSeries - } - testCases := []testcase{ - {createDummyTimeSeries(2)}, - {createDummyTimeSeries(10)}, - {createDummyTimeSeries(100)}, - } - for _, tc := range testCases { - symbolTable := newRwSymbolTable() - seriesBuff := make([]prompb.MinimizedTimeSeries, len(tc.batch)) - for i := range seriesBuff { - seriesBuff[i].Samples = []prompb.Sample{{}} - seriesBuff[i].Exemplars = []prompb.Exemplar{{}} - } - pBuf := []byte{} +//func makeUncompressedReducedWriteRequestBenchData(b testing.TB) []byte { +// data := createDummyTimeSeries(1000) +// pool := newLookupPool() +// pBuf := proto.NewBuffer(nil) +// seriesBuff := make([]prompb.ReducedTimeSeries, len(data)) +// for i := range seriesBuff { +// seriesBuff[i].Samples = []prompb.Sample{{}} +// seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}} +// } +// +// populateReducedTimeSeries(pool, data, seriesBuff, true, true) +// res, _, err := buildReducedWriteRequestWithCompression(seriesBuff, pool.getTable(), pBuf, &noopCompression{}) +// if err != nil { +// b.Fatal(err) +// } +// return res +//} - // Warmup buffers - for i := 0; i < 10; i++ { - populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) - buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) - } - - b.Run(fmt.Sprintf("%d-instances", len(tc.batch)), func(b *testing.B) { - totalSize := 0 - for j := 0; j < b.N; j++ { - populateMinimizedTimeSeries(&symbolTable, tc.batch, seriesBuff, true, true) - b.ResetTimer() - req, _, err := buildMinimizedWriteRequest(seriesBuff, symbolTable.LabelsString(), &pBuf, &buff) - if err != nil { - b.Fatal(err) - } - symbolTable.clear() - totalSize += len(req) - b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") - } - }) - } -} - -func makeUncompressedReducedWriteRequestBenchData(b testing.TB) []byte { - data := createDummyTimeSeries(1000) - pool := newLookupPool() - pBuf := proto.NewBuffer(nil) - seriesBuff := make([]prompb.ReducedTimeSeries, len(data)) - for i := range seriesBuff { - seriesBuff[i].Samples = []prompb.Sample{{}} - seriesBuff[i].Exemplars = []prompb.ExemplarRef{{}} - } - - populateReducedTimeSeries(pool, data, seriesBuff, true, true) - res, _, err := buildReducedWriteRequestWithCompression(seriesBuff, pool.getTable(), pBuf, &noopCompression{}) - if err != nil { - b.Fatal(err) - } - return res -} - -func makeUncompressedWriteRequestBenchData(b *testing.B) []byte { +func makeUncompressedWriteRequestBenchData() ([]byte, error) { data := createDummyTimeSeries(1000) seriesBuff := make([]prompb.TimeSeries, len(data)) for i := range seriesBuff { @@ -1567,34 +1589,37 @@ func makeUncompressedWriteRequestBenchData(b *testing.B) []byte { populateTimeSeries(data, seriesBuff, true, true) res, _, err := buildWriteRequestWithCompression(seriesBuff, nil, pBuf, &noopCompression{}) if err != nil { - b.Fatal(err) + return nil, err } - return res + return res, nil } func BenchmarkCompressWriteRequest(b *testing.B) { - uncompV1 := makeUncompressedWriteRequestBenchData(b) - uncompV11 := makeUncompressedReducedWriteRequestBenchData(b) + uncompV1, err := makeUncompressedWriteRequestBenchData() + require.NoError(b, err) + //uncompV11 := makeUncompressedReducedWriteRequestBenchData(b) // buf := make([]byte, 0) - bench := func(b *testing.B, data []byte, comp Compression) { + bench := func(b *testing.B, name string, data []byte, comp Compression) { b.ResetTimer() totalSize := 0 var res []byte var err error for i := 0; i < b.N; i++ { + fmt.Println("data len: ", len(data)) res, err = comp.Compress(data) if err != nil { b.Fatal(err) } totalSize += len(res) + fmt.Println("compressed len:", len(res)) b.ReportMetric(float64(totalSize)/float64(b.N), "compressedSize/op") } b.StopTimer() // sanity check res, err = comp.Decompress(res) if err != nil { - b.Fatal(err) + b.Fatal(err, fmt.Sprint("compression: ", name)) } if !bytes.Equal(res, data) { b.Fatalf("decompressed data doesn't match original") @@ -1604,43 +1629,44 @@ func BenchmarkCompressWriteRequest(b *testing.B) { cases := []struct { name string data []byte - comp Compression + comp Compressor }{ - {"v1-go-snappy", uncompV1, &snappyCompression{}}, - {"v1-snappy", uncompV1, &snappyAltCompression{}}, - {"v1-s2", uncompV1, &s2Compression{}}, - {"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}}, - {"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}}, - {"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}}, - {"v1-Lzw", uncompV1, &lzwCompression{}}, - {"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}}, - {"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}}, - {"v1-Brotli-1", uncompV1, &brotliCompression{quality: 1}}, - {"v1-Brotli-11", uncompV1, &brotliCompression{quality: 1}}, - {"v1-Brotli-5", uncompV1, &brotliCompression{quality: 5}}, + //{"v1-go-snappy", uncompV1, &snappyCompression{}}, + //{"v1-snappy", uncompV1, &snappyAltCompression{}}, + {"v1-s2", uncompV1, NewCompressor(Snappy)}, + //{"v1-ZstdFastest", uncompV1, &zstdCompression{level: zstd.SpeedFastest}}, + //{"v1-ZstdSpeedDef", uncompV1, &zstdCompression{level: zstd.SpeedDefault}}, + //{"v1-ZstdBestComp", uncompV1, &zstdCompression{level: zstd.SpeedBestCompression}}, + //{"v1-Lzw", uncompV1, &lzwCompression{}}, + //{"v1-FlateBestComp", uncompV1, &flateCompression{level: flate.BestCompression}}, + //{"v1-FlateBestSpeed", uncompV1, &flateCompression{level: flate.BestSpeed}}, + //{"v1-Brotli-1", uncompV1, &brotliCompression{quality: 1}}, + //{"v1-Brotli-11", uncompV1, &brotliCompression{quality: 1}}, + //{"v1-Brotli-5", uncompV1, &brotliCompression{quality: 5}}, - {"v1.1-go-snappy", uncompV11, &snappyCompression{}}, - {"v1.1-snappy", uncompV11, &snappyAltCompression{}}, - {"v1.1-s2", uncompV11, &s2Compression{}}, - {"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}}, - {"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}}, - {"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}}, - {"v1.1-Lzw", uncompV11, &lzwCompression{}}, - {"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}}, - {"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}}, - {"v1.1-Brotli-1", uncompV11, &brotliCompression{quality: 1}}, - {"v1.1-Brotli-11", uncompV11, &brotliCompression{quality: 1}}, - {"v1.1-Brotli-5", uncompV11, &brotliCompression{quality: 5}}, + //{"v1.1-go-snappy", uncompV11, &snappyCompression{}}, + //{"v1.1-snappy", uncompV11, &snappyAltCompression{}}, + //{"v1.1-s2", uncompV11, &s2Compression{}}, + //{"v1.1-ZstdFastest", uncompV11, &zstdCompression{level: zstd.SpeedFastest}}, + //{"v1.1-ZstdSpeedDef", uncompV11, &zstdCompression{level: zstd.SpeedDefault}}, + //{"v1.1-ZstdBestComp", uncompV11, &zstdCompression{level: zstd.SpeedBestCompression}}, + //{"v1.1-Lzw", uncompV11, &lzwCompression{}}, + //{"v1.1-FlateBestComp", uncompV11, &flateCompression{level: flate.BestCompression}}, + //{"v1.1-FlateBestSpeed", uncompV11, &flateCompression{level: flate.BestSpeed}}, + //{"v1.1-Brotli-1", uncompV11, &brotliCompression{quality: 1}}, + //{"v1.1-Brotli-11", uncompV11, &brotliCompression{quality: 1}}, + //{"v1.1-Brotli-5", uncompV11, &brotliCompression{quality: 5}}, } // Warmup buffers for _, c := range cases { - bench(b, c.data, c.comp) + bench(b, c.name, c.data, c.comp) } + fmt.Println("done warm up") for _, c := range cases { b.Run(c.name, func(b *testing.B) { - bench(b, c.data, c.comp) + bench(b, c.name, c.data, c.comp) }) } } diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go index 5897642d6b..52ebc55c6d 100644 --- a/storage/remote/read_test.go +++ b/storage/remote/read_test.go @@ -92,7 +92,7 @@ func TestNoDuplicateReadConfigs(t *testing.T) { for _, tc := range cases { t.Run("", func(t *testing.T) { // todo: test with new format type(s)? - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteReadConfigs: tc.cfgs, diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 63abe30ba9..6522ce7f30 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -62,7 +62,7 @@ type Storage struct { } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat) *Storage { +func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, rwCompression RemoteWriteCompression) *Storage { if l == nil { l = log.NewNopLogger() } @@ -72,7 +72,7 @@ func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCal logger: logger, localStartTimeCallback: stCallback, } - s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, rwFormat) + s.rws = NewWriteStorage(s.logger, reg, walDir, flushDeadline, sm, rwFormat, rwCompression) return s } diff --git a/storage/remote/storage_test.go b/storage/remote/storage_test.go index 59826f0128..dee7a073de 100644 --- a/storage/remote/storage_test.go +++ b/storage/remote/storage_test.go @@ -28,7 +28,7 @@ func TestStorageLifecycle(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -56,7 +56,7 @@ func TestUpdateRemoteReadConfigs(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, @@ -78,7 +78,7 @@ func TestFilterExternalLabels(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ @@ -104,7 +104,7 @@ func TestIgnoreExternalLabels(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.GlobalConfig{ diff --git a/storage/remote/write.go b/storage/remote/write.go index 733ea6fdfb..38bb7956fd 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -66,6 +66,7 @@ type WriteStorage struct { dir string queues map[string]*QueueManager rwFormat RemoteWriteFormat + rwComp RemoteWriteCompression samplesIn *ewmaRate flushDeadline time.Duration interner *pool @@ -77,13 +78,14 @@ type WriteStorage struct { } // NewWriteStorage creates and runs a WriteStorage. -func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat) *WriteStorage { +func NewWriteStorage(logger log.Logger, reg prometheus.Registerer, dir string, flushDeadline time.Duration, sm ReadyScrapeManager, rwFormat RemoteWriteFormat, rwCompression RemoteWriteCompression) *WriteStorage { if logger == nil { logger = log.NewNopLogger() } rws := &WriteStorage{ queues: make(map[string]*QueueManager), rwFormat: rwFormat, + rwComp: rwCompression, watcherMetrics: wlog.NewWatcherMetrics(reg), liveReaderMetrics: wlog.NewLiveReaderMetrics(reg), logger: logger, @@ -201,6 +203,7 @@ func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { rwConf.SendExemplars, rwConf.SendNativeHistograms, rws.rwFormat, + rws.rwComp, ) // Keep track of which queues are new so we know which to start. newHashes = append(newHashes, hash) diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 09ad8422c9..a23062f5c2 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -48,15 +48,18 @@ type writeHandler struct { // The handler will accept the new format, but it can still accept the old one // TODO: this should eventually be via content negotiation rwFormat RemoteWriteFormat + //rwComp RemoteWriteCompression + comp Compressor } // NewWriteHandler creates a http.Handler that accepts remote write requests and // writes them to the provided appendable. -func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat RemoteWriteFormat) http.Handler { +func NewWriteHandler(logger log.Logger, reg prometheus.Registerer, appendable storage.Appendable, rwFormat RemoteWriteFormat, rwComp RemoteWriteCompression) http.Handler { h := &writeHandler{ logger: logger, appendable: appendable, rwFormat: rwFormat, + comp: NewCompressor(rwComp), samplesWithInvalidLabelsTotal: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: "prometheus", Subsystem: "api", @@ -79,11 +82,11 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO: this should eventually be done via content negotiation/looking at the header switch h.rwFormat { case Base1: - req, err = DecodeWriteRequest(r.Body) - case Min32Optimized: - reqMin, err = DecodeMinimizedWriteRequest(r.Body) - case MinLen: - reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body) + req, err = DecodeWriteRequest(r.Body, &h.comp) + //case Min32Optimized: + // reqMin, err = DecodeMinimizedWriteRequest(r.Body, &h.comp) + //case MinLen: + // reqMinLen, err = DecodeMinimizedWriteRequestLen(r.Body) } if err != nil { diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index 9f8068e9c0..c3f0600105 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -38,7 +38,8 @@ import ( ) func TestRemoteWriteHandler(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) + c := NewCompressor(Snappy) + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, &c) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -46,7 +47,7 @@ func TestRemoteWriteHandler(t *testing.T) { appendable := &mockAppendable{} // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -84,61 +85,62 @@ func TestRemoteWriteHandler(t *testing.T) { } } -func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { - buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil) - require.NoError(t, err) - - req, err := http.NewRequest("", "", bytes.NewReader(buf)) - req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) - require.NoError(t, err) - - appendable := &mockAppendable{} - // TODO: test with other proto format(s) - handler := NewWriteHandler(nil, nil, appendable, Min32Optimized) - - recorder := httptest.NewRecorder() - handler.ServeHTTP(recorder, req) - - resp := recorder.Result() - require.Equal(t, http.StatusNoContent, resp.StatusCode) - - i := 0 - j := 0 - k := 0 - // the reduced write request is equivalent to the write request fixture. - // we can use it for - for _, ts := range writeRequestFixture.Timeseries { - ls := labelProtosToLabels(ts.Labels) - for _, s := range ts.Samples { - require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) - i++ - } - - for _, e := range ts.Exemplars { - exemplarLabels := labelProtosToLabels(e.Labels) - require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) - j++ - } - - for _, hp := range ts.Histograms { - if hp.IsFloatHistogram() { - fh := FloatHistogramProtoToFloatHistogram(hp) - require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) - } else { - h := HistogramProtoToHistogram(hp) - require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) - } - - k++ - } - } -} +//func TestRemoteWriteHandlerMinimizedFormat(t *testing.T) { +// buf, _, err := buildMinimizedWriteRequest(writeRequestMinimizedFixture.Timeseries, writeRequestMinimizedFixture.Symbols, nil, nil) +// require.NoError(t, err) +// +// req, err := http.NewRequest("", "", bytes.NewReader(buf)) +// req.Header.Set(RemoteWriteVersionHeader, RemoteWriteVersion11HeaderValue) +// require.NoError(t, err) +// +// appendable := &mockAppendable{} +// // TODO: test with other proto format(s) +// handler := NewWriteHandler(nil, nil, appendable, Min32Optimized) +// +// recorder := httptest.NewRecorder() +// handler.ServeHTTP(recorder, req) +// +// resp := recorder.Result() +// require.Equal(t, http.StatusNoContent, resp.StatusCode) +// +// i := 0 +// j := 0 +// k := 0 +// // the reduced write request is equivalent to the write request fixture. +// // we can use it for +// for _, ts := range writeRequestFixture.Timeseries { +// ls := labelProtosToLabels(ts.Labels) +// for _, s := range ts.Samples { +// require.Equal(t, mockSample{ls, s.Timestamp, s.Value}, appendable.samples[i]) +// i++ +// } +// +// for _, e := range ts.Exemplars { +// exemplarLabels := labelProtosToLabels(e.Labels) +// require.Equal(t, mockExemplar{ls, exemplarLabels, e.Timestamp, e.Value}, appendable.exemplars[j]) +// j++ +// } +// +// for _, hp := range ts.Histograms { +// if hp.IsFloatHistogram() { +// fh := FloatHistogramProtoToFloatHistogram(hp) +// require.Equal(t, mockHistogram{ls, hp.Timestamp, nil, fh}, appendable.histograms[k]) +// } else { +// h := HistogramProtoToHistogram(hp) +// require.Equal(t, mockHistogram{ls, hp.Timestamp, h, nil}, appendable.histograms[k]) +// } +// +// k++ +// } +// } +//} func TestOutOfOrderSample(t *testing.T) { + c := NewCompressor(Snappy) buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Samples: []prompb.Sample{{Value: 1, Timestamp: 0}}, - }}, nil, nil, nil) + }}, nil, nil, &c) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -148,7 +150,7 @@ func TestOutOfOrderSample(t *testing.T) { latestSample: 100, } // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -161,10 +163,11 @@ func TestOutOfOrderSample(t *testing.T) { // don't fail on ingestion errors since the exemplar storage is // still experimental. func TestOutOfOrderExemplar(t *testing.T) { + c := NewCompressor(Snappy) buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Exemplars: []prompb.Exemplar{{Labels: []prompb.Label{{Name: "foo", Value: "bar"}}, Value: 1, Timestamp: 0}}, - }}, nil, nil, nil) + }}, nil, nil, &c) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -174,7 +177,7 @@ func TestOutOfOrderExemplar(t *testing.T) { latestExemplar: 100, } // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -185,10 +188,11 @@ func TestOutOfOrderExemplar(t *testing.T) { } func TestOutOfOrderHistogram(t *testing.T) { + c := NewCompressor(Snappy) buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ Labels: []prompb.Label{{Name: "__name__", Value: "test_metric"}}, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram), FloatHistogramToHistogramProto(1, testHistogram.ToFloat())}, - }}, nil, nil, nil) + }}, nil, nil, &c) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -198,7 +202,7 @@ func TestOutOfOrderHistogram(t *testing.T) { latestHistogram: 100, } // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -210,6 +214,8 @@ func TestOutOfOrderHistogram(t *testing.T) { func BenchmarkRemoteWritehandler(b *testing.B) { const labelValue = "abcdefg'hijlmn234!@#$%^&*()_+~`\"{}[],./<>?hello0123hiOlá你好Dzieńdobry9Zd8ra765v4stvuyte" var reqs []*http.Request + c := NewCompressor(Snappy) + for i := 0; i < b.N; i++ { num := strings.Repeat(strconv.Itoa(i), 16) buf, _, err := buildWriteRequest([]prompb.TimeSeries{{ @@ -218,7 +224,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) { {Name: "test_label_name_" + num, Value: labelValue + num}, }, Histograms: []prompb.Histogram{HistogramToHistogramProto(0, &testHistogram)}, - }}, nil, nil, nil) + }}, nil, nil, &c) require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) require.NoError(b, err) @@ -227,7 +233,7 @@ func BenchmarkRemoteWritehandler(b *testing.B) { appendable := &mockAppendable{} // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy) recorder := httptest.NewRecorder() b.ResetTimer() @@ -237,7 +243,9 @@ func BenchmarkRemoteWritehandler(b *testing.B) { } func TestCommitErr(t *testing.T) { - buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, nil) + c := NewCompressor(Snappy) + + buf, _, err := buildWriteRequest(writeRequestFixture.Timeseries, nil, nil, &c) require.NoError(t, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) @@ -247,7 +255,7 @@ func TestCommitErr(t *testing.T) { commitErr: fmt.Errorf("commit error"), } // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1) + handler := NewWriteHandler(log.NewNopLogger(), nil, appendable, Base1, Snappy) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -273,9 +281,10 @@ func BenchmarkRemoteWriteOOOSamples(b *testing.B) { require.NoError(b, db.Close()) }) // TODO: test with other proto format(s) - handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Base1) + handler := NewWriteHandler(log.NewNopLogger(), nil, db.Head(), Base1, Snappy) - buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, nil) + c := NewCompressor(Snappy) + buf, _, err := buildWriteRequest(genSeriesWithSample(1000, 200*time.Minute.Milliseconds()), nil, nil, &c) require.NoError(b, err) req, err := http.NewRequest("", "", bytes.NewReader(buf)) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index 34cd9b90a5..d42379d24d 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -118,7 +118,7 @@ func TestNoDuplicateWriteConfigs(t *testing.T) { for _, tc := range cases { // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: tc.cfgs, @@ -141,7 +141,7 @@ func TestRestartOnNameChange(t *testing.T) { require.NoError(t, err) // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1) + s := NewWriteStorage(nil, nil, dir, time.Millisecond, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, @@ -167,7 +167,7 @@ func TestUpdateWithRegisterer(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, Base1) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Millisecond, nil, Base1, Snappy) c1 := &config.RemoteWriteConfig{ Name: "named", URL: &common_config.URL{ @@ -208,7 +208,7 @@ func TestWriteStorageLifecycle(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.DefaultGlobalConfig, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -226,7 +226,7 @@ func TestUpdateExternalLabels(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, Base1) + s := NewWriteStorage(nil, prometheus.NewRegistry(), dir, time.Second, nil, Base1, Snappy) externalLabels := labels.FromStrings("external", "true") conf := &config.Config{ @@ -256,7 +256,7 @@ func TestWriteStorageApplyConfigsIdempotent(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) conf := &config.Config{ GlobalConfig: config.GlobalConfig{}, RemoteWriteConfigs: []*config.RemoteWriteConfig{ @@ -282,7 +282,7 @@ func TestWriteStorageApplyConfigsPartialUpdate(t *testing.T) { dir := t.TempDir() // todo: test with new format type(s) - s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1) + s := NewWriteStorage(nil, nil, dir, defaultFlushDeadline, nil, Base1, Snappy) c0 := &config.RemoteWriteConfig{ RemoteTimeout: model.Duration(10 * time.Second), diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 7594b65472..759f5fae34 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -254,6 +254,7 @@ func NewAPI( statsRenderer StatsRenderer, rwEnabled bool, rwFormat remote.RemoteWriteFormat, + rwComp remote.RemoteWriteCompression, otlpEnabled bool, ) *API { a := &API{ @@ -296,7 +297,7 @@ func NewAPI( } if rwEnabled { - a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat) + a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, rwFormat, rwComp) } if otlpEnabled { a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap) diff --git a/web/web.go b/web/web.go index 6430166f4a..8847ba48a1 100644 --- a/web/web.go +++ b/web/web.go @@ -264,6 +264,7 @@ type Options struct { IsAgent bool AppName string RemoteWriteFormat remote.RemoteWriteFormat + RemoteWriteCompression remote.RemoteWriteCompression Gatherer prometheus.Gatherer Registerer prometheus.Registerer @@ -354,6 +355,7 @@ func New(logger log.Logger, o *Options) *Handler { nil, o.EnableRemoteWriteReceiver, o.RemoteWriteFormat, + o.RemoteWriteCompression, o.EnableOTLPWriteReceiver, )