diff --git a/tsdb/chunkenc/bstream.go b/tsdb/chunkenc/bstream.go index 833c9794b6..60531023ba 100644 --- a/tsdb/chunkenc/bstream.go +++ b/tsdb/chunkenc/bstream.go @@ -119,11 +119,18 @@ type bstreamReader struct { buffer uint64 // The current buffer, filled from the stream, containing up to 8 bytes from which read bits. valid uint8 // The number of right-most bits valid to read (from left) in the current 8 byte buffer. + last byte // A copy of the last byte of the stream. } func newBReader(b []byte) bstreamReader { + // The last byte of the stream can be updated later, so we take a copy. + var last byte + if len(b) > 0 { + last = b[len(b)-1] + } return bstreamReader{ stream: b, + last: last, } } @@ -223,17 +230,24 @@ func (b *bstreamReader) loadNextBuffer(nbits uint8) bool { return true } - // We're here if the are 8 or less bytes left in the stream. Since this reader needs - // to handle race conditions with concurrent writes happening on the very last byte - // we make sure to never over more than the minimum requested bits (rounded up to - // the next byte). The following code is slower but called less frequently. + // We're here if there are 8 or less bytes left in the stream. + // The following code is slower but called less frequently. nbytes := int((nbits / 8) + 1) if b.streamOffset+nbytes > len(b.stream) { nbytes = len(b.stream) - b.streamOffset } buffer := uint64(0) - for i := 0; i < nbytes; i++ { + skip := 0 + if b.streamOffset+nbytes == len(b.stream) { + // There can be concurrent writes happening on the very last byte + // of the stream, so use the copy we took at initialization time. + buffer = buffer | uint64(b.last) + // Read up to the byte before + skip = 1 + } + + for i := 0; i < nbytes-skip; i++ { buffer = buffer | (uint64(b.stream[b.streamOffset+i]) << uint(8*(nbytes-i-1))) } diff --git a/tsdb/chunkenc/xor.go b/tsdb/chunkenc/xor.go index 716f0698f0..3429c2c606 100644 --- a/tsdb/chunkenc/xor.go +++ b/tsdb/chunkenc/xor.go @@ -88,6 +88,8 @@ func (c *XORChunk) Compact() { } // Appender implements the Chunk interface. +// It is not valid to call Appender() multiple times concurrently or to use multiple +// Appenders on the same chunk. func (c *XORChunk) Appender() (Appender, error) { it := c.iterator(nil) @@ -115,9 +117,6 @@ func (c *XORChunk) Appender() (Appender, error) { } func (c *XORChunk) iterator(it Iterator) *xorIterator { - // Should iterators guarantee to act on a copy of the data so it doesn't lock append? - // When using striped locks to guard access to chunks, probably yes. - // Could only copy data if the chunk is not completed yet. if xorIter, ok := it.(*xorIterator); ok { xorIter.Reset(c.b.bytes()) return xorIter @@ -132,6 +131,9 @@ func (c *XORChunk) iterator(it Iterator) *xorIterator { } // Iterator implements the Chunk interface. +// Iterator() must not be called concurrently with any modifications to the chunk, +// but after it returns you can use an Iterator concurrently with an Appender or +// other Iterators. func (c *XORChunk) Iterator(it Iterator) Iterator { return c.iterator(it) } diff --git a/tsdb/head.go b/tsdb/head.go index 90cfacf79f..8dd1511639 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1803,9 +1803,8 @@ type memSeries struct { nextAt int64 // Timestamp at which to cut the next chunk. - // We keep the last 4 samples here (in addition to appending them to the chunk) so we don't need coordination between appender and querier. - // Even the most compact encoding of a sample takes 2 bits, so the last byte is not contended. - sampleBuf [4]sample + // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. + lastValue float64 // Current appender for the head chunk. Set when a new head chunk is cut. // It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 067281cc46..f843aa1ec6 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -362,7 +362,7 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi // like federation and erroring out at that time would be extremely noisy. // This only checks against the latest in-order sample. // The OOO headchunk has its own method to detect these duplicates. - if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { + if math.Float64bits(s.lastValue) != math.Float64bits(v) { return false, 0, storage.ErrDuplicateSampleForTimestamp } // Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf. @@ -800,11 +800,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper s.app.Append(t, v) c.maxTime = t - - s.sampleBuf[0] = s.sampleBuf[1] - s.sampleBuf[1] = s.sampleBuf[2] - s.sampleBuf[2] = s.sampleBuf[3] - s.sampleBuf[3] = sample{t: t, v: v} + s.lastValue = v if appendID > 0 && s.txs != nil { s.txs.add(appendID) diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 0fe24792ab..a97537a1fe 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -677,87 +677,25 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch if stopAfter == 0 { return chunkenc.NewNopIterator() } - - if int(id)-int(s.firstChunkID) < len(s.mmappedChunks) { - if stopAfter == numSamples { - return c.chunk.Iterator(it) - } - if msIter, ok := it.(*stopIterator); ok { - msIter.Iterator = c.chunk.Iterator(msIter.Iterator) - msIter.i = -1 - msIter.stopAfter = stopAfter - return msIter - } - return &stopIterator{ - Iterator: c.chunk.Iterator(it), - i: -1, - stopAfter: stopAfter, - } - } - // Serve the last 4 samples for the last chunk from the sample buffer - // as their compressed bytes may be mutated by added samples. - if msIter, ok := it.(*memSafeIterator); ok { - msIter.Iterator = c.chunk.Iterator(msIter.Iterator) - msIter.i = -1 - msIter.total = numSamples - msIter.stopAfter = stopAfter - msIter.buf = s.sampleBuf - return msIter - } - return &memSafeIterator{ - stopIterator: stopIterator{ - Iterator: c.chunk.Iterator(it), - i: -1, - stopAfter: stopAfter, - }, - total: numSamples, - buf: s.sampleBuf, + if stopAfter == numSamples { + return c.chunk.Iterator(it) } + return makeStopIterator(c.chunk, it, stopAfter) } -// memSafeIterator returns values from the wrapped stopIterator -// except the last 4, which come from buf. -type memSafeIterator struct { - stopIterator - - total int - buf [4]sample -} - -func (it *memSafeIterator) Seek(t int64) bool { - if it.Err() != nil { - return false +func makeStopIterator(c chunkenc.Chunk, it chunkenc.Iterator, stopAfter int) chunkenc.Iterator { + // Re-use the Iterator object if it is a stopIterator. + if stopIter, ok := it.(*stopIterator); ok { + stopIter.Iterator = c.Iterator(stopIter.Iterator) + stopIter.i = -1 + stopIter.stopAfter = stopAfter + return stopIter } - - ts, _ := it.At() - - for t > ts || it.i == -1 { - if !it.Next() { - return false - } - ts, _ = it.At() + return &stopIterator{ + Iterator: c.Iterator(it), + i: -1, + stopAfter: stopAfter, } - - return true -} - -func (it *memSafeIterator) Next() bool { - if it.i+1 >= it.stopAfter { - return false - } - it.i++ - if it.total-it.i > 4 { - return it.Iterator.Next() - } - return true -} - -func (it *memSafeIterator) At() (int64, float64) { - if it.total-it.i > 4 { - return it.Iterator.At() - } - s := it.buf[4-(it.total-it.i)] - return s.t, s.v } // stopIterator wraps an Iterator, but only returns the first diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 3a6266e22e..cbea0a7b98 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -767,16 +767,6 @@ func TestMemSeries_truncateChunks(t *testing.T) { chk, _, err = s.chunk(lastID, chunkDiskMapper, &memChunkPool) require.NoError(t, err) require.Equal(t, lastChunk, chk) - - // Validate that the series' sample buffer is applied correctly to the last chunk - // after truncation. - it1 := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, &memChunkPool, nil) - _, ok := it1.(*memSafeIterator) - require.True(t, ok) - - it2 := s.iterator(s.headChunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, &memChunkPool, nil) - _, ok = it2.(*memSafeIterator) - require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer") } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { @@ -2486,7 +2476,7 @@ func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { } } -func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { +func TestIteratorSeekIntoBuffer(t *testing.T) { dir := t.TempDir() // This is usually taken from the Head, but passing manually here. chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) @@ -2504,11 +2494,9 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { } it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil) - _, ok := it.(*memSafeIterator) - require.True(t, ok) // First point. - ok = it.Seek(0) + ok := it.Seek(0) require.True(t, ok) ts, val := it.At() require.Equal(t, int64(0), ts) diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index a69cd8019c..0a5ee05995 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -778,7 +778,7 @@ type chunkSnapshotRecord struct { ref chunks.HeadSeriesRef lset labels.Labels mc *memChunk - sampleBuf [4]sample + lastValue float64 } func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { @@ -798,11 +798,13 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutBE64int64(s.headChunk.maxTime) buf.PutByte(byte(s.headChunk.chunk.Encoding())) buf.PutUvarintBytes(s.headChunk.chunk.Bytes()) - // Put the sample buf. - for _, smpl := range s.sampleBuf { - buf.PutBE64int64(smpl.t) - buf.PutBEFloat64(smpl.v) + // Backwards compatibility for old sampleBuf which had last 4 samples. + for i := 0; i < 3; i++ { + buf.PutBE64int64(0) + buf.PutBEFloat64(0) } + buf.PutBE64int64(0) + buf.PutBEFloat64(s.lastValue) } s.Unlock() @@ -842,10 +844,13 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh } csr.mc.chunk = chk - for i := range csr.sampleBuf { - csr.sampleBuf[i].t = dec.Be64int64() - csr.sampleBuf[i].v = dec.Be64Float64() + // Backwards-compatibility for old sampleBuf which had last 4 samples. + for i := 0; i < 3; i++ { + _ = dec.Be64int64() + _ = dec.Be64Float64() } + _ = dec.Be64int64() + csr.lastValue = dec.Be64Float64() err = dec.Err() if err != nil && len(dec.B) > 0 { @@ -1222,10 +1227,7 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie } series.nextAt = csr.mc.maxTime // This will create a new chunk on append. series.headChunk = csr.mc - for i := range series.sampleBuf { - series.sampleBuf[i].t = csr.sampleBuf[i].t - series.sampleBuf[i].v = csr.sampleBuf[i].v - } + series.lastValue = csr.lastValue app, err := series.headChunk.chunk.Appender() if err != nil {