diff --git a/internal/s3select/simdj/reader.go b/internal/s3select/simdj/reader.go index 86a72e433..63c35cf45 100644 --- a/internal/s3select/simdj/reader.go +++ b/internal/s3select/simdj/reader.go @@ -81,7 +81,7 @@ func (r *Reader) Close() error { // If r.input is closed, it is assumed that no more input will come. // When this function returns r.readerWg will be decremented and r.decoded will be closed. // On errors, r.err will be set. This should only be accessed after r.decoded has been closed. -func (r *Reader) startReader(reuse chan<- *simdjson.ParsedJson) { +func (r *Reader) startReader() { defer r.onReaderExit() var tmpObj simdjson.Object for { @@ -144,11 +144,6 @@ func (r *Reader) startReader(reuse chan<- *simdjson.ParsedJson) { return } } - // Don't block if we cannot reuse. - select { - case reuse <- in.Value: - default: - } if in.Error == io.EOF { return } @@ -174,10 +169,10 @@ func NewReader(readCloser io.ReadCloser, args *json.ReaderArgs) *Reader { r.readerWg.Done() } - reuse := make(chan *simdjson.ParsedJson, 1000) - simdjson.ParseNDStream(readCloser, r.input, reuse) + // We cannot reuse as we are sending parsed objects elsewhere. + simdjson.ParseNDStream(readCloser, r.input, nil) r.readerWg.Add(1) - go r.startReader(reuse) + go r.startReader() return &r }