From 1e58d792a57ee81c44e54ffe3cce7a132f55a389 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Mon, 7 Jul 2025 17:23:34 +1000 Subject: [PATCH] storage/remote: fix "http: read on closed response body" errors if chunkedSeriesSet.Next is called again after the series set is exhausted (#16838) Signed-off-by: Charles Korn --- storage/remote/codec.go | 12 ++++++++++-- storage/remote/codec_test.go | 35 +++++++++++++++++++++++++++++++++-- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 80bb811500..3dbf432bcf 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -547,8 +547,9 @@ type chunkedSeriesSet struct { mint, maxt int64 cancel func(error) - current storage.Series - err error + current storage.Series + err error + exhausted bool } func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel func(error)) storage.SeriesSet { @@ -564,6 +565,12 @@ func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, m // Next return true if there is a next series and false otherwise. It will // block until the next series is available. func (s *chunkedSeriesSet) Next() bool { + if s.exhausted { + // Don't try to read the next series again. + // This prevents errors like "http: read on closed response body" if Next() is called after it has already returned false. + return false + } + res := &prompb.ChunkedReadResponse{} err := s.chunkedReader.NextProto(res) @@ -575,6 +582,7 @@ func (s *chunkedSeriesSet) Next() bool { _ = s.respBody.Close() s.cancel(err) + s.exhausted = true return false } diff --git a/storage/remote/codec_test.go b/storage/remote/codec_test.go index c92f0f8cde..7aba1a133e 100644 --- a/storage/remote/codec_test.go +++ b/storage/remote/codec_test.go @@ -15,6 +15,7 @@ package remote import ( "bytes" + "errors" "fmt" "io" "sync" @@ -892,7 +893,8 @@ func TestChunkedSeriesSet(t *testing.T) { flusher := &mockFlusher{} w := NewChunkedWriter(buf, flusher) - r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) + wrappedReader := newOneShotCloser(buf) + r := NewChunkedReader(wrappedReader, config.DefaultChunkedReadLimit, nil) chks := buildTestChunks(t) l := []prompb.Label{ @@ -913,7 +915,7 @@ func TestChunkedSeriesSet(t *testing.T) { require.NoError(t, err) } - ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {}) + ss := NewChunkedSeriesSet(r, wrappedReader, 0, 14000, func(error) {}) require.NoError(t, ss.Err()) require.Nil(t, ss.Warnings()) @@ -938,6 +940,9 @@ func TestChunkedSeriesSet(t *testing.T) { } require.Equal(t, numTestChunks, numResponses) require.NoError(t, ss.Err()) + + require.False(t, ss.Next(), "Next() should still return false after it previously returned false") + require.NoError(t, ss.Err(), "Err() should not return an error if Next() is called again after it previously returned false") }) t.Run("chunked reader error", func(t *testing.T) { @@ -983,6 +988,32 @@ type mockFlusher struct{} func (f *mockFlusher) Flush() {} +type oneShotCloser struct { + r io.Reader + closed bool +} + +func newOneShotCloser(r io.Reader) io.ReadCloser { + return &oneShotCloser{r, false} +} + +func (c *oneShotCloser) Read(p []byte) (n int, err error) { + if c.closed { + return 0, errors.New("already closed") + } + + return c.r.Read(p) +} + +func (c *oneShotCloser) Close() error { + if c.closed { + return errors.New("already closed") + } + + c.closed = true + return nil +} + const ( numTestChunks = 3 numSamplesPerTestChunk = 5