diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 326efe6887..5ef1fa311b 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -236,6 +236,7 @@ Outer: seriesPool.Put(v) case []record.RefSample: samples := v + minValidTime := h.minValidTime.Load() // We split up the samples into chunks of 5000 samples or less. // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise // cause thousands of very large in flight buffers occupying large amounts @@ -249,6 +250,9 @@ Outer: shards[i] = processors[i].reuseBuf() } for _, sam := range samples[:m] { + if sam.T < minValidTime { + continue // Before minValidTime: discard. + } if r, ok := multiRef[sam.Ref]; ok { sam.Ref = r } @@ -412,11 +416,9 @@ func (wp *walSubsetProcessor) reuseBuf() []record.RefSample { // processWALSamples adds the samples it receives to the head and passes // the buffer received to an output channel for reuse. -// Samples before the minValidTime timestamp are discarded. func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks map[chunks.HeadSeriesRef][]*mmappedChunk) (unknownRefs, mmapOverlappingChunks uint64) { defer close(wp.output) - minValidTime := h.minValidTime.Load() mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for in := range wp.input { @@ -429,9 +431,6 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks map[chunk } for _, s := range in.samples { - if s.T < minValidTime { - continue - } ms := h.series.getByID(s.Ref) if ms == nil { unknownRefs++