diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ce5a290a16..deb16c4160 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,6 +41,7 @@ jobs: - uses: ./.github/promci/actions/setup_environment - run: go test --tags=dedupelabels ./... - run: go test --tags=slicelabels -race ./cmd/prometheus + - run: go test --tags=slicelabels -race ./prompb/io/prometheus/client - run: go test --tags=forcedirectio -race ./tsdb/ - run: GOARCH=386 go test ./... - uses: ./.github/promci/actions/check_proto diff --git a/prompb/io/prometheus/client/decoder_test.go b/prompb/io/prometheus/client/decoder_test.go index 8697b78fca..18cf186127 100644 --- a/prompb/io/prometheus/client/decoder_test.go +++ b/prompb/io/prometheus/client/decoder_test.go @@ -17,13 +17,17 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" "io" + "math/rand" "testing" "github.com/gogo/protobuf/proto" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/util/pool" ) const ( @@ -169,3 +173,74 @@ func TestMetricStreamingDecoder(t *testing.T) { // Expect labels and metricBytes to be static and reusable even after parsing. require.Equal(t, `{checksum="", path="github.com/prometheus/client_golang", version="(devel)"}`, firstMetricLset.String()) } + +func TestMetricStreamingDecoder_LabelsCorruption(t *testing.T) { + lastScrapeSize := 0 + var allPreviousLabels []labels.Labels + buffers := pool.New(128, 1024, 2, func(sz int) interface{} { return make([]byte, 0, sz) }) + builder := labels.NewScratchBuilder(0) + for _, labelsCount := range []int{1, 2, 3, 5, 8, 5, 3, 2, 1} { + // Get buffer from pool like in scrape.go + b := buffers.Get(lastScrapeSize).([]byte) + buf := bytes.NewBuffer(b) + + // Generate some scraped data to parse + mf := &MetricFamily{} + data := generateMetricFamilyText(labelsCount) + require.NoError(t, proto.UnmarshalText(data, mf)) + protoBuf, err := proto.Marshal(mf) + require.NoError(t, err) + sizeBuf := make([]byte, binary.MaxVarintLen32) + sizeBufSize := binary.PutUvarint(sizeBuf, uint64(len(protoBuf))) + buf.Write(sizeBuf[:sizeBufSize]) + buf.Write(protoBuf) + + // Use decoder like protobufparse.go would + b = buf.Bytes() + d := NewMetricStreamingDecoder(b) + require.NoError(t, d.NextMetricFamily()) + require.NoError(t, d.NextMetric()) + + // Get the labels + builder.Reset() + require.NoError(t, d.Label(&builder)) // <- this uses unsafe strings to create labels + lbs := builder.Labels() + allPreviousLabels = append(allPreviousLabels, lbs) + + // Validate all labels seen so far remain valid and not corrupted + for _, l := range allPreviousLabels { + require.True(t, l.IsValid(model.LegacyValidation), "encountered corrupted labels: %v", l) + } + + lastScrapeSize = len(b) + buffers.Put(b) + } +} + +func generateLabels() string { + randomName := fmt.Sprintf("instance_%d", rand.Intn(1000)) + randomValue := fmt.Sprintf("value_%d", rand.Intn(1000)) + return fmt.Sprintf(`label: < + name: "%s" + value: "%s" + >`, randomName, randomValue) +} + +func generateMetricFamilyText(labelsCount int) string { + randomName := fmt.Sprintf("metric_%d", rand.Intn(1000)) + randomHelp := fmt.Sprintf("Test metric to demonstrate forced corruption %d.", rand.Intn(1000)) + labels10 := "" + for i := 0; i < labelsCount; i++ { + labels10 += generateLabels() + } + return fmt.Sprintf(`name: "%s" +help: "%s" +type: GAUGE +metric: < + %s + gauge: < + value: 1.0 + > +> +`, randomName, randomHelp, labels10) +}