diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 8f1e9bc54a..13d56de7bf 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -519,7 +519,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper if numSamples == samplesPerChunk/4 { s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) } - if t >= s.nextAt { + // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, + // most likely because samples rate has changed and now they are arriving more frequently. + // Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk + // as we expect more chunks to come. + // Note that next chunk will have its nextAt recalculated for the new rate. + if t >= s.nextAt || numSamples >= samplesPerChunk*2 { c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 54dc82cf94..7cd072a192 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1308,6 +1308,50 @@ func TestMemSeries_append(t *testing.T) { } } +func TestMemSeries_append_atVariableRate(t *testing.T) { + const samplesPerChunk = 120 + dir := t.TempDir() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, chunkDiskMapper.Close()) + }) + + s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, nil, defaultIsolationDisabled) + + // At this slow rate, we will fill the chunk in two block durations. + slowRate := (DefaultBlockDuration * 2) / samplesPerChunk + + var nextTs int64 + var totalAppendedSamples int + for i := 0; i < samplesPerChunk/4; i++ { + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + require.Truef(t, ok, "slow sample %d was not appended", i) + nextTs += slowRate + totalAppendedSamples++ + } + require.Equal(t, DefaultBlockDuration, s.nextAt, "after appending a samplesPerChunk/4 samples at a slow rate, we should aim to cut a new block at the default block duration %d, but it's set to %d", DefaultBlockDuration, s.nextAt) + + // Suddenly, the rate increases and we receive a sample every millisecond. + for i := 0; i < math.MaxUint16; i++ { + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + require.Truef(t, ok, "quick sample %d was not appended", i) + nextTs++ + totalAppendedSamples++ + } + ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper) + require.True(t, ok, "new chunk sample was not appended") + require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") + + var totalSamplesInChunks int + for i, c := range s.mmappedChunks { + totalSamplesInChunks += int(c.numSamples) + require.LessOrEqualf(t, c.numSamples, uint16(2*samplesPerChunk), "mmapped chunk %d has more than %d samples", i, 2*samplesPerChunk) + } + require.Equal(t, totalAppendedSamples, totalSamplesInChunks, "wrong number of samples in %d mmapped chunks", len(s.mmappedChunks)) +} + func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. h, _ := newTestHead(t, 1000, false)