diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 21b51ba5ef..786ed0884f 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -24,6 +24,7 @@ import ( "time" "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" deltatocumulative "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -58,7 +59,11 @@ type writeHandler struct { ingestCTZeroSample bool } -const maxAheadTime = 10 * time.Minute +const ( + maxAheadTime = 10 * time.Minute + // decodeWriteLimit is the maximum decoded size of a remote write request body in bytes. + decodeWriteLimit = 32 * 1024 * 1024 +) // NewWriteHandler creates a http.Handler that accepts remote write requests with // the given message in acceptedProtoMsgs and writes them to the provided appendable. @@ -163,6 +168,19 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + decodedLen, err := snappy.DecodedLen(body) + if err != nil { + h.logger.Error("Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if decodedLen > decodeWriteLimit { + err := fmt.Errorf("snappy: decoded length %d exceeds limit %d", decodedLen, decodeWriteLimit) + h.logger.Error("Error decoding remote write request", "err", err.Error()) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + decompressed, err := compression.Decode(compression.Snappy, body, nil) if err != nil { // TODO(bwplotka): Add more context to responded error? diff --git a/storage/remote/write_handler_test.go b/storage/remote/write_handler_test.go index e3207bf273..16395647f2 100644 --- a/storage/remote/write_handler_test.go +++ b/storage/remote/write_handler_test.go @@ -245,6 +245,22 @@ func TestRemoteWriteHandlerHeadersHandling_V2Message(t *testing.T) { } } +func TestRemoteWriteHandlerTooLarge(t *testing.T) { + // 5-byte snappy stream whose header claims 256 MiB decoded length, + // well above decodeWriteLimit (32 MiB). + bomb := []byte{0x80, 0x80, 0x80, 0x80, 0x01} + req, err := http.NewRequest(http.MethodPost, "/", bytes.NewReader(bomb)) + require.NoError(t, err) + + appendable := &mockAppendable{} + handler := NewWriteHandler(promslog.NewNopLogger(), nil, appendable, []config.RemoteWriteProtoMsg{config.RemoteWriteProtoMsgV1}, false) + + recorder := httptest.NewRecorder() + handler.ServeHTTP(recorder, req) + require.Equal(t, http.StatusBadRequest, recorder.Code) + require.Contains(t, recorder.Body.String(), "exceeds limit") +} + func TestRemoteWriteHandler_V1Message(t *testing.T) { payload, _, _, err := buildWriteRequest(nil, writeRequestFixture.Timeseries, nil, nil, nil, nil, "snappy") require.NoError(t, err)