storage/remote: fix "http: read on closed response body" errors if chunkedSeriesSet.Next is called again after the series set is exhausted (#16838)

Signed-off-by: Charles Korn <charles.korn@grafana.com>
This commit is contained in:
Charles Korn 2025-07-07 17:23:34 +10:00 committed by GitHub
parent 938e5cb62b
commit 1e58d792a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 43 additions and 4 deletions

View File

@ -547,8 +547,9 @@ type chunkedSeriesSet struct {
mint, maxt int64 mint, maxt int64
cancel func(error) cancel func(error)
current storage.Series current storage.Series
err error err error
exhausted bool
} }
func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel func(error)) storage.SeriesSet { func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, mint, maxt int64, cancel func(error)) storage.SeriesSet {
@ -564,6 +565,12 @@ func NewChunkedSeriesSet(chunkedReader *ChunkedReader, respBody io.ReadCloser, m
// Next return true if there is a next series and false otherwise. It will // Next return true if there is a next series and false otherwise. It will
// block until the next series is available. // block until the next series is available.
func (s *chunkedSeriesSet) Next() bool { func (s *chunkedSeriesSet) Next() bool {
if s.exhausted {
// Don't try to read the next series again.
// This prevents errors like "http: read on closed response body" if Next() is called after it has already returned false.
return false
}
res := &prompb.ChunkedReadResponse{} res := &prompb.ChunkedReadResponse{}
err := s.chunkedReader.NextProto(res) err := s.chunkedReader.NextProto(res)
@ -575,6 +582,7 @@ func (s *chunkedSeriesSet) Next() bool {
_ = s.respBody.Close() _ = s.respBody.Close()
s.cancel(err) s.cancel(err)
s.exhausted = true
return false return false
} }

View File

@ -15,6 +15,7 @@ package remote
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io" "io"
"sync" "sync"
@ -892,7 +893,8 @@ func TestChunkedSeriesSet(t *testing.T) {
flusher := &mockFlusher{} flusher := &mockFlusher{}
w := NewChunkedWriter(buf, flusher) w := NewChunkedWriter(buf, flusher)
r := NewChunkedReader(buf, config.DefaultChunkedReadLimit, nil) wrappedReader := newOneShotCloser(buf)
r := NewChunkedReader(wrappedReader, config.DefaultChunkedReadLimit, nil)
chks := buildTestChunks(t) chks := buildTestChunks(t)
l := []prompb.Label{ l := []prompb.Label{
@ -913,7 +915,7 @@ func TestChunkedSeriesSet(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
ss := NewChunkedSeriesSet(r, io.NopCloser(buf), 0, 14000, func(error) {}) ss := NewChunkedSeriesSet(r, wrappedReader, 0, 14000, func(error) {})
require.NoError(t, ss.Err()) require.NoError(t, ss.Err())
require.Nil(t, ss.Warnings()) require.Nil(t, ss.Warnings())
@ -938,6 +940,9 @@ func TestChunkedSeriesSet(t *testing.T) {
} }
require.Equal(t, numTestChunks, numResponses) require.Equal(t, numTestChunks, numResponses)
require.NoError(t, ss.Err()) require.NoError(t, ss.Err())
require.False(t, ss.Next(), "Next() should still return false after it previously returned false")
require.NoError(t, ss.Err(), "Err() should not return an error if Next() is called again after it previously returned false")
}) })
t.Run("chunked reader error", func(t *testing.T) { t.Run("chunked reader error", func(t *testing.T) {
@ -983,6 +988,32 @@ type mockFlusher struct{}
func (f *mockFlusher) Flush() {} func (f *mockFlusher) Flush() {}
type oneShotCloser struct {
r io.Reader
closed bool
}
func newOneShotCloser(r io.Reader) io.ReadCloser {
return &oneShotCloser{r, false}
}
func (c *oneShotCloser) Read(p []byte) (n int, err error) {
if c.closed {
return 0, errors.New("already closed")
}
return c.r.Read(p)
}
func (c *oneShotCloser) Close() error {
if c.closed {
return errors.New("already closed")
}
c.closed = true
return nil
}
const ( const (
numTestChunks = 3 numTestChunks = 3
numSamplesPerTestChunk = 5 numSamplesPerTestChunk = 5