diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8251f960e9..2e4b628699 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,7 +45,8 @@ jobs: steps: - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - run: make build - - run: make test GO_ONLY=1 + # Don't run NPM build; don't run race-detector. + - run: make test GO_ONLY=1 test-flags="" test_ui: name: UI tests diff --git a/.github/workflows/container_description.yml b/.github/workflows/container_description.yml new file mode 100644 index 0000000000..0b8a8cc05f --- /dev/null +++ b/.github/workflows/container_description.yml @@ -0,0 +1,55 @@ +--- +name: Push README to Docker Hub +on: + push: + paths: + - "README.md" + - ".github/workflows/container_description.yml" + branches: [ main, master ] + +permissions: + contents: read + +jobs: + PushDockerHubReadme: + runs-on: ubuntu-latest + name: Push README to Docker Hub + if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks. + steps: + - name: git checkout + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + - name: Set docker hub repo name + run: echo "DOCKER_REPO_NAME=$(make common-docker-repo-name)" >> $GITHUB_ENV + - name: Push README to Dockerhub + uses: christian-korneck/update-container-description-action@d36005551adeaba9698d8d67a296bd16fa91f8e8 # v1 + env: + DOCKER_USER: ${{ secrets.DOCKER_HUB_LOGIN }} + DOCKER_PASS: ${{ secrets.DOCKER_HUB_PASSWORD }} + with: + destination_container_repo: ${{ env.DOCKER_REPO_NAME }} + provider: dockerhub + short_description: ${{ env.DOCKER_REPO_NAME }} + readme_file: 'README.md' + + PushQuayIoReadme: + runs-on: ubuntu-latest + name: Push README to quay.io + if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks. + steps: + - name: git checkout + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + - name: Set quay.io org name + run: echo "DOCKER_REPO=$(echo quay.io/${GITHUB_REPOSITORY_OWNER} | tr -d '-')" >> $GITHUB_ENV + - name: Set quay.io repo name + run: echo "DOCKER_REPO_NAME=$(make common-docker-repo-name)" >> $GITHUB_ENV + - name: Push README to quay.io + uses: christian-korneck/update-container-description-action@d36005551adeaba9698d8d67a296bd16fa91f8e8 # v1 + env: + DOCKER_APIKEY: ${{ secrets.QUAY_IO_API_TOKEN }} + with: + destination_container_repo: ${{ env.DOCKER_REPO_NAME }} + provider: quay + readme_file: 'README.md' + + + diff --git a/CHANGELOG.md b/CHANGELOG.md index 8951866819..52c6f35586 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## unreleased + +* [CHANGE] TSDB: Fix the predicate checking for blocks which are beyond the retention period to include the ones right at the retention boundary. #9633 + ## 2.51.0 / 2024-03-18 This version is built with Go 1.22.1. diff --git a/MAINTAINERS.md b/MAINTAINERS.md index a776eb3594..8113ac5296 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -1,6 +1,10 @@ # Maintainers -Julien Pivotto ( / @roidelapluie) and Levi Harrison ( / @LeviHarrison) are the main/default maintainers, some parts of the codebase have other maintainers: +General maintainers: +* Bryan Boreham (bjboreham@gmail.com / @bboreham) +* Levi Harrison (levi@leviharrison.dev / @LeviHarrison) +* Ayoub Mrini (ayoubmrini424@gmail.com / @machine424) +* Julien Pivotto (roidelapluie@prometheus.io / @roidelapluie) * `cmd` * `promtool`: David Leadbeater ( / @dgl) @@ -19,7 +23,6 @@ George Krajcsovits ( / @krajorama) * `module`: Augustin Husson ( @nexucis) * `Makefile` and related build configuration: Simon Pasquier ( / @simonpasquier), Ben Kochie ( / @SuperQ) - For the sake of brevity, not all subtrees are explicitly listed. Due to the size of this repository, the natural changes in focus of maintainers over time, and nuances of where particular features live, this list will always be diff --git a/Makefile.common b/Makefile.common index 92558151e3..49ed5f5478 100644 --- a/Makefile.common +++ b/Makefile.common @@ -208,6 +208,10 @@ common-tarball: promu @echo ">> building release tarball" $(PROMU) tarball --prefix $(PREFIX) $(BIN_DIR) +.PHONY: common-docker-repo-name +common-docker-repo-name: + @echo "$(DOCKER_REPO)/$(DOCKER_IMAGE_NAME)" + .PHONY: common-docker $(BUILD_DOCKER_ARCHS) common-docker: $(BUILD_DOCKER_ARCHS) $(BUILD_DOCKER_ARCHS): common-docker-%: diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 614d884637..f64c00e824 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -960,8 +960,8 @@ func main() { func() error { // Don't forget to release the reloadReady channel so that waiting blocks can exit normally. select { - case <-term: - level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...") + case sig := <-term: + level.Warn(logger).Log("msg", "Received an OS signal, exiting gracefully...", "signal", sig.String()) reloadReady.Close() case <-webHandler.Quit(): level.Warn(logger).Log("msg", "Received termination request via web service, exiting gracefully...") diff --git a/config/config_test.go b/config/config_test.go index 36b125f794..14981d25f0 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1840,7 +1840,7 @@ var expectedErrors = []struct { }, { filename: "azure_authentication_method.bad.yml", - errMsg: "unknown authentication_type \"invalid\". Supported types are \"OAuth\" or \"ManagedIdentity\"", + errMsg: "unknown authentication_type \"invalid\". Supported types are \"OAuth\", \"ManagedIdentity\" or \"SDK\"", }, { filename: "azure_bearertoken_basicauth.bad.yml", diff --git a/discovery/azure/azure.go b/discovery/azure/azure.go index 16628c7bfd..746496a699 100644 --- a/discovery/azure/azure.go +++ b/discovery/azure/azure.go @@ -65,6 +65,7 @@ const ( azureLabelMachineSize = azureLabel + "machine_size" authMethodOAuth = "OAuth" + authMethodSDK = "SDK" authMethodManagedIdentity = "ManagedIdentity" ) @@ -164,8 +165,8 @@ func (c *SDConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { } } - if c.AuthenticationMethod != authMethodOAuth && c.AuthenticationMethod != authMethodManagedIdentity { - return fmt.Errorf("unknown authentication_type %q. Supported types are %q or %q", c.AuthenticationMethod, authMethodOAuth, authMethodManagedIdentity) + if c.AuthenticationMethod != authMethodOAuth && c.AuthenticationMethod != authMethodManagedIdentity && c.AuthenticationMethod != authMethodSDK { + return fmt.Errorf("unknown authentication_type %q. Supported types are %q, %q or %q", c.AuthenticationMethod, authMethodOAuth, authMethodManagedIdentity, authMethodSDK) } return c.HTTPClientConfig.Validate() @@ -294,6 +295,16 @@ func newCredential(cfg SDConfig, policyClientOptions policy.ClientOptions) (azco return nil, err } credential = azcore.TokenCredential(secretCredential) + case authMethodSDK: + options := &azidentity.DefaultAzureCredentialOptions{ClientOptions: policyClientOptions} + if len(cfg.TenantID) != 0 { + options.TenantID = cfg.TenantID + } + sdkCredential, err := azidentity.NewDefaultAzureCredential(options) + if err != nil { + return nil, err + } + credential = azcore.TokenCredential(sdkCredential) } return credential, nil } diff --git a/docs/configuration/configuration.md b/docs/configuration/configuration.md index e134ae02b7..d751a4084e 100644 --- a/docs/configuration/configuration.md +++ b/docs/configuration/configuration.md @@ -600,8 +600,10 @@ See below for the configuration options for Azure discovery: # The Azure environment. [ environment: | default = AzurePublicCloud ] -# The authentication method, either OAuth or ManagedIdentity. +# The authentication method, either OAuth, ManagedIdentity or SDK. # See https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview +# SDK authentication method uses environment variables by default. +# See https://learn.microsoft.com/en-us/azure/developer/go/azure-sdk-authentication [ authentication_method: | default = OAuth] # The subscription ID. Always required. subscription_id: @@ -3619,6 +3621,11 @@ azuread: [ client_secret: ] [ tenant_id: ] ] + # Azure SDK auth. + # See https://learn.microsoft.com/en-us/azure/developer/go/azure-sdk-authentication + [ sdk: + [ tenant_id: ] ] + # Configures the remote write request's TLS settings. tls_config: [ ] diff --git a/documentation/examples/remote_storage/go.mod b/documentation/examples/remote_storage/go.mod index 38497cbf50..917563f00c 100644 --- a/documentation/examples/remote_storage/go.mod +++ b/documentation/examples/remote_storage/go.mod @@ -9,7 +9,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/influxdata/influxdb v1.11.5 github.com/prometheus/client_golang v1.19.0 - github.com/prometheus/common v0.49.0 + github.com/prometheus/common v0.50.0 github.com/prometheus/prometheus v0.50.1 github.com/stretchr/testify v1.9.0 ) @@ -58,17 +58,17 @@ require ( go.opentelemetry.io/otel/trace v1.22.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.19.0 // indirect + golang.org/x/crypto v0.21.0 // indirect golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect - golang.org/x/net v0.21.0 // indirect - golang.org/x/oauth2 v0.17.0 // indirect - golang.org/x/sys v0.17.0 // indirect + golang.org/x/net v0.22.0 // indirect + golang.org/x/oauth2 v0.18.0 // indirect + golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240116215550-a9fa1716bcac // indirect google.golang.org/grpc v1.61.0 // indirect - google.golang.org/protobuf v1.32.0 // indirect + google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apimachinery v0.28.6 // indirect diff --git a/documentation/examples/remote_storage/go.sum b/documentation/examples/remote_storage/go.sum index 6ffa445b29..50db8d7934 100644 --- a/documentation/examples/remote_storage/go.sum +++ b/documentation/examples/remote_storage/go.sum @@ -269,8 +269,8 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.29.0/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= -github.com/prometheus/common v0.49.0 h1:ToNTdK4zSnPVJmh698mGFkDor9wBI/iGaJy5dbH1EgI= -github.com/prometheus/common v0.49.0/go.mod h1:Kxm+EULxRbUkjGU6WFsQqo3ORzB4tyKvlWFOE9mB2sE= +github.com/prometheus/common v0.50.0 h1:YSZE6aa9+luNa2da6/Tik0q0A5AbR+U003TItK57CPQ= +github.com/prometheus/common v0.50.0/go.mod h1:wHFBCEVWVmHMUpg7pYcOm2QUR/ocQdYSJVQJKnHc3xQ= github.com/prometheus/common/sigv4 v0.1.0 h1:qoVebwtwwEhS85Czm2dSROY5fTo2PAPEVdDeppTwGX4= github.com/prometheus/common/sigv4 v0.1.0/go.mod h1:2Jkxxk9yYvCkE5G1sQT7GuEXm57JrvHu9k5YwTjsNtI= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -332,8 +332,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo= -golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20240119083558-1b970713d09a h1:Q8/wZp0KX97QFTc2ywcOE0YRjZPVIx+MXInMzdvQqcA= golang.org/x/exp v0.0.0-20240119083558-1b970713d09a/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -356,12 +356,12 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= -golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= -golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5HA= +golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= +golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -389,12 +389,12 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= -golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= -golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= @@ -436,8 +436,8 @@ google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzi google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= -google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= +google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/model/labels/labels_common.go b/model/labels/labels_common.go index 4c4a87e872..f46321c97e 100644 --- a/model/labels/labels_common.go +++ b/model/labels/labels_common.go @@ -39,7 +39,8 @@ type Label struct { } func (ls Labels) String() string { - var b bytes.Buffer + var bytea [1024]byte // On stack to avoid memory allocation while building the output. + b := bytes.NewBuffer(bytea[:0]) b.WriteByte('{') i := 0 @@ -50,7 +51,7 @@ func (ls Labels) String() string { } b.WriteString(l.Name) b.WriteByte('=') - b.WriteString(strconv.Quote(l.Value)) + b.Write(strconv.AppendQuote(b.AvailableBuffer(), l.Value)) i++ }) b.WriteByte('}') diff --git a/model/labels/labels_test.go b/model/labels/labels_test.go index c2ac6d63a0..49b4b4e67b 100644 --- a/model/labels/labels_test.go +++ b/model/labels/labels_test.go @@ -43,6 +43,13 @@ func TestLabels_String(t *testing.T) { } } +func BenchmarkString(b *testing.B) { + ls := New(benchmarkLabels...) + for i := 0; i < b.N; i++ { + _ = ls.String() + } +} + func TestLabels_MatchLabels(t *testing.T) { labels := FromStrings( "__name__", "ALERTS", @@ -785,24 +792,24 @@ func BenchmarkLabels_Hash(b *testing.B) { } } -func BenchmarkBuilder(b *testing.B) { - m := []Label{ - {"job", "node"}, - {"instance", "123.123.1.211:9090"}, - {"path", "/api/v1/namespaces//deployments/"}, - {"method", "GET"}, - {"namespace", "system"}, - {"status", "500"}, - {"prometheus", "prometheus-core-1"}, - {"datacenter", "eu-west-1"}, - {"pod_name", "abcdef-99999-defee"}, - } +var benchmarkLabels = []Label{ + {"job", "node"}, + {"instance", "123.123.1.211:9090"}, + {"path", "/api/v1/namespaces//deployments/"}, + {"method", "GET"}, + {"namespace", "system"}, + {"status", "500"}, + {"prometheus", "prometheus-core-1"}, + {"datacenter", "eu-west-1"}, + {"pod_name", "abcdef-99999-defee"}, +} +func BenchmarkBuilder(b *testing.B) { var l Labels builder := NewBuilder(EmptyLabels()) for i := 0; i < b.N; i++ { builder.Reset(EmptyLabels()) - for _, l := range m { + for _, l := range benchmarkLabels { builder.Set(l.Name, l.Value) } l = builder.Labels() @@ -811,18 +818,7 @@ func BenchmarkBuilder(b *testing.B) { } func BenchmarkLabels_Copy(b *testing.B) { - m := map[string]string{ - "job": "node", - "instance": "123.123.1.211:9090", - "path": "/api/v1/namespaces//deployments/", - "method": "GET", - "namespace": "system", - "status": "500", - "prometheus": "prometheus-core-1", - "datacenter": "eu-west-1", - "pod_name": "abcdef-99999-defee", - } - l := FromMap(m) + l := New(benchmarkLabels...) for i := 0; i < b.N; i++ { l = l.Copy() diff --git a/promql/engine.go b/promql/engine.go index 68e0502907..cf60f477af 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -115,6 +115,12 @@ func (e ErrStorage) Error() string { return e.Err.Error() } +// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked. +type QueryEngine interface { + NewInstantQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, ts time.Time) (Query, error) + NewRangeQuery(ctx context.Context, q storage.Queryable, opts QueryOpts, qs string, start, end time.Time, interval time.Duration) (Query, error) +} + // QueryLogger is an interface that can be used to log all the queries logged // by the engine. type QueryLogger interface { @@ -1196,6 +1202,9 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) if prepSeries != nil { bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si]) } + // Don't add histogram size here because we only + // copy the pointer above, not the whole + // histogram. ev.currentSamples++ if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) @@ -1221,7 +1230,6 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper) if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } - ev.samplesStats.UpdatePeak(ev.currentSamples) // If this could be an instant query, shortcut so as not to change sort order. if ev.endTimestamp == ev.startTimestamp { @@ -1540,13 +1548,12 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio histSamples := totalHPointSize(ss.Histograms) if len(ss.Floats)+histSamples > 0 { - if ev.currentSamples+len(ss.Floats)+histSamples <= ev.maxSamples { - mat = append(mat, ss) - prevSS = &mat[len(mat)-1] - ev.currentSamples += len(ss.Floats) + histSamples - } else { + if ev.currentSamples+len(ss.Floats)+histSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } + mat = append(mat, ss) + prevSS = &mat[len(mat)-1] + ev.currentSamples += len(ss.Floats) + histSamples } ev.samplesStats.UpdatePeak(ev.currentSamples) @@ -1709,26 +1716,28 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio step++ _, f, h, ok := ev.vectorSelectorSingle(it, e, ts) if ok { - if ev.currentSamples < ev.maxSamples { - if h == nil { - if ss.Floats == nil { - ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) - } - ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) - ev.currentSamples++ - ev.samplesStats.IncrementSamplesAtStep(step, 1) - } else { - if ss.Histograms == nil { - ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) - } - point := HPoint{H: h, T: ts} - ss.Histograms = append(ss.Histograms, point) - histSize := point.size() - ev.currentSamples += histSize - ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize)) + if h == nil { + ev.currentSamples++ + ev.samplesStats.IncrementSamplesAtStep(step, 1) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) } + if ss.Floats == nil { + ss.Floats = reuseOrGetFPointSlices(prevSS, numSteps) + } + ss.Floats = append(ss.Floats, FPoint{F: f, T: ts}) } else { - ev.error(ErrTooManySamples(env)) + point := HPoint{H: h, T: ts} + histSize := point.size() + ev.currentSamples += histSize + ev.samplesStats.IncrementSamplesAtStep(step, int64(histSize)) + if ev.currentSamples > ev.maxSamples { + ev.error(ErrTooManySamples(env)) + } + if ss.Histograms == nil { + ss.Histograms = reuseOrGetHPointSlices(prevSS, numSteps) + } + ss.Histograms = append(ss.Histograms, point) } } } @@ -1856,7 +1865,7 @@ func (ev *evaluator) eval(expr parser.Expr) (parser.Value, annotations.Annotatio panic(fmt.Errorf("unhandled expression of type: %T", expr)) } -// reuseOrGetFPointSlices reuses the space from previous slice to create new slice if the former has lots of room. +// reuseOrGetHPointSlices reuses the space from previous slice to create new slice if the former has lots of room. // The previous slices capacity is adjusted so when it is re-used from the pool it doesn't overflow into the new one. func reuseOrGetHPointSlices(prevSS *Series, numSteps int) (r []HPoint) { if prevSS != nil && cap(prevSS.Histograms)-2*len(prevSS.Histograms) > 0 { @@ -2168,10 +2177,10 @@ loop: histograms = histograms[:n] continue loop } - if ev.currentSamples >= ev.maxSamples { + ev.currentSamples += histograms[n].size() + if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } - ev.currentSamples += histograms[n].size() } case chunkenc.ValFloat: t, f := buf.At() @@ -2180,10 +2189,10 @@ loop: } // Values in the buffer are guaranteed to be smaller than maxt. if t >= mintFloats { - if ev.currentSamples >= ev.maxSamples { + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } - ev.currentSamples++ if floats == nil { floats = getFPointSlice(16) } @@ -2211,22 +2220,22 @@ loop: histograms = histograms[:n] break } - if ev.currentSamples >= ev.maxSamples { + ev.currentSamples += histograms[n].size() + if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } - ev.currentSamples += histograms[n].size() case chunkenc.ValFloat: t, f := it.At() if t == maxt && !value.IsStaleNaN(f) { - if ev.currentSamples >= ev.maxSamples { + ev.currentSamples++ + if ev.currentSamples > ev.maxSamples { ev.error(ErrTooManySamples(env)) } if floats == nil { floats = getFPointSlice(16) } floats = append(floats, FPoint{T: t, F: f}) - ev.currentSamples++ } } ev.samplesStats.UpdatePeak(ev.currentSamples) diff --git a/promql/engine_test.go b/promql/engine_test.go index 105108d5b2..cc5d0ee780 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -755,6 +755,7 @@ load 10s metricWith3SampleEvery10Seconds{a="1",b="1"} 1+1x100 metricWith3SampleEvery10Seconds{a="2",b="2"} 1+1x100 metricWith3SampleEvery10Seconds{a="3",b="2"} 1+1x100 + metricWith1HistogramEvery10Seconds {{schema:1 count:5 sum:20 buckets:[1 2 1 1]}}+{{schema:1 count:10 sum:5 buckets:[1 2 3 4]}}x100 `) t.Cleanup(func() { storage.Close() }) @@ -795,6 +796,15 @@ load 10s 21000: 1, }, }, + { + Query: "metricWith1HistogramEvery10Seconds", + Start: time.Unix(21, 0), + PeakSamples: 12, + TotalSamples: 12, // 1 histogram sample of size 12 / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 12, + }, + }, { // timestamp function has a special handling. Query: "timestamp(metricWith1SampleEvery10Seconds)", @@ -805,6 +815,15 @@ load 10s 21000: 1, }, }, + { + Query: "timestamp(metricWith1HistogramEvery10Seconds)", + Start: time.Unix(21, 0), + PeakSamples: 13, // histogram size 12 + 1 extra because of timestamp + TotalSamples: 1, // 1 float sample (because of timestamp) / 10 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 21000: 1, + }, + }, { Query: "metricWith1SampleEvery10Seconds", Start: time.Unix(22, 0), @@ -877,11 +896,20 @@ load 10s 201000: 6, }, }, + { + Query: "metricWith1HistogramEvery10Seconds[60s]", + Start: time.Unix(201, 0), + PeakSamples: 72, + TotalSamples: 72, // 1 histogram (size 12) / 10 seconds * 60 seconds + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 72, + }, + }, { Query: "max_over_time(metricWith1SampleEvery10Seconds[59s])[20s:5s]", Start: time.Unix(201, 0), PeakSamples: 10, - TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 60/5 (using 59s so we always return 6 samples + TotalSamples: 24, // (1 sample / 10 seconds * 60 seconds) * 20/5 (using 59s so we always return 6 samples // as if we run a query on 00 looking back 60 seconds we will return 7 samples; // see next test). TotalSamplesPerStep: stats.TotalSamplesPerStep{ @@ -892,12 +920,22 @@ load 10s Query: "max_over_time(metricWith1SampleEvery10Seconds[60s])[20s:5s]", Start: time.Unix(201, 0), PeakSamples: 11, - TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) + 2 as + TotalSamples: 26, // (1 sample / 10 seconds * 60 seconds) * 4 + 2 as // max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples. TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 26, }, }, + { + Query: "max_over_time(metricWith1HistogramEvery10Seconds[60s])[20s:5s]", + Start: time.Unix(201, 0), + PeakSamples: 72, + TotalSamples: 312, // (1 histogram (size 12) / 10 seconds * 60 seconds) * 4 + 2 * 12 as + // max_over_time(metricWith1SampleEvery10Seconds[60s]) @ 190 and 200 will return 7 samples. + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 312, + }, + }, { Query: "metricWith1SampleEvery10Seconds[60s] @ 30", Start: time.Unix(201, 0), @@ -907,6 +945,15 @@ load 10s 201000: 4, }, }, + { + Query: "metricWith1HistogramEvery10Seconds[60s] @ 30", + Start: time.Unix(201, 0), + PeakSamples: 48, + TotalSamples: 48, // @ modifier force the evaluation to at 30 seconds - So it brings 4 datapoints (0, 10, 20, 30 seconds) * 1 series + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 48, + }, + }, { Query: "sum(max_over_time(metricWith3SampleEvery10Seconds[60s] @ 30))", Start: time.Unix(201, 0), @@ -1035,13 +1082,42 @@ load 10s }, }, { - // timestamp function as a special handling + Query: `metricWith1HistogramEvery10Seconds`, + Start: time.Unix(204, 0), + End: time.Unix(223, 0), + Interval: 5 * time.Second, + PeakSamples: 48, + TotalSamples: 48, // 1 histogram (size 12) per query * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 204000: 12, // aligned to the step time, not the sample time + 209000: 12, + 214000: 12, + 219000: 12, + }, + }, + { + // timestamp function has a special handling Query: "timestamp(metricWith1SampleEvery10Seconds)", Start: time.Unix(201, 0), End: time.Unix(220, 0), Interval: 5 * time.Second, PeakSamples: 5, - TotalSamples: 4, // (1 sample / 10 seconds) * 4 steps + TotalSamples: 4, // 1 sample per query * 4 steps + TotalSamplesPerStep: stats.TotalSamplesPerStep{ + 201000: 1, + 206000: 1, + 211000: 1, + 216000: 1, + }, + }, + { + // timestamp function has a special handling + Query: "timestamp(metricWith1HistogramEvery10Seconds)", + Start: time.Unix(201, 0), + End: time.Unix(220, 0), + Interval: 5 * time.Second, + PeakSamples: 16, + TotalSamples: 4, // 1 sample per query * 4 steps TotalSamplesPerStep: stats.TotalSamplesPerStep{ 201000: 1, 206000: 1, diff --git a/rules/manager.go b/rules/manager.go index e87d55b1e1..165dca144e 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -43,7 +43,7 @@ type QueryFunc func(ctx context.Context, q string, t time.Time) (promql.Vector, // EngineQueryFunc returns a new query function that executes instant queries against // the given engine. // It converts scalar into vector results. -func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc { +func EngineQueryFunc(engine promql.QueryEngine, q storage.Queryable) QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { q, err := engine.NewInstantQuery(ctx, q, nil, qs, t) if err != nil { diff --git a/scrape/scrape.go b/scrape/scrape.go index f6f3367736..064b0b67bc 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -954,13 +954,14 @@ func (c *scrapeCache) iterDone(flushCache bool) { } } -func (c *scrapeCache) get(met []byte) (*cacheEntry, bool) { +func (c *scrapeCache) get(met []byte) (*cacheEntry, bool, bool) { e, ok := c.series[string(met)] if !ok { - return nil, false + return nil, false, false } + alreadyScraped := e.lastIter == c.iter e.lastIter = c.iter - return e, true + return e, true, alreadyScraped } func (c *scrapeCache) addRef(met []byte, ref storage.SeriesRef, lset labels.Labels, hash uint64) { @@ -1566,7 +1567,7 @@ loop: if sl.cache.getDropped(met) { continue } - ce, ok := sl.cache.get(met) + ce, ok, seriesAlreadyScraped := sl.cache.get(met) var ( ref storage.SeriesRef hash uint64 @@ -1575,6 +1576,7 @@ loop: if ok { ref = ce.ref lset = ce.lset + hash = ce.hash // Update metadata only if it changed in the current iteration. updateMetadata(lset, false) @@ -1611,25 +1613,36 @@ loop: updateMetadata(lset, true) } - if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { - ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) - if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. - // CT is an experimental feature. For now, we don't need to fail the - // scrape on errors updating the created timestamp, log debug. - level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + if seriesAlreadyScraped { + err = storage.ErrDuplicateSampleForTimestamp + } else { + if ctMs := p.CreatedTimestamp(); sl.enableCTZeroIngestion && ctMs != nil { + ref, err = app.AppendCTZeroSample(ref, lset, t, *ctMs) + if err != nil && !errors.Is(err, storage.ErrOutOfOrderCT) { // OOO is a common case, ignoring completely for now. + // CT is an experimental feature. For now, we don't need to fail the + // scrape on errors updating the created timestamp, log debug. + level.Debug(sl.l).Log("msg", "Error when appending CT in scrape loop", "series", string(met), "ct", *ctMs, "t", t, "err", err) + } + } + + if isHistogram { + if h != nil { + ref, err = app.AppendHistogram(ref, lset, t, h, nil) + } else { + ref, err = app.AppendHistogram(ref, lset, t, nil, fh) + } + } else { + ref, err = app.Append(ref, lset, t, val) } } - if isHistogram { - if h != nil { - ref, err = app.AppendHistogram(ref, lset, t, h, nil) - } else { - ref, err = app.AppendHistogram(ref, lset, t, nil, fh) + if err == nil { + if (parsedTimestamp == nil || sl.trackTimestampsStaleness) && ce != nil { + sl.cache.trackStaleness(ce.hash, ce.lset) } - } else { - ref, err = app.Append(ref, lset, t, val) } - sampleAdded, err = sl.checkAddError(ce, met, parsedTimestamp, err, &sampleLimitErr, &bucketLimitErr, &appErrs) + + sampleAdded, err = sl.checkAddError(met, err, &sampleLimitErr, &bucketLimitErr, &appErrs) if err != nil { if !errors.Is(err, storage.ErrNotFound) { level.Debug(sl.l).Log("msg", "Unexpected error", "series", string(met), "err", err) @@ -1650,6 +1663,8 @@ loop: // Increment added even if there's an error so we correctly report the // number of samples remaining after relabeling. + // We still report duplicated samples here since this number should be the exact number + // of time series exposed on a scrape after relabelling. added++ exemplars = exemplars[:0] // Reset and reuse the exemplar slice. for hasExemplar := p.Exemplar(&e); hasExemplar; hasExemplar = p.Exemplar(&e) { @@ -1744,12 +1759,9 @@ loop: // Adds samples to the appender, checking the error, and then returns the # of samples added, // whether the caller should continue to process more samples, and any sample or bucket limit errors. -func (sl *scrapeLoop) checkAddError(ce *cacheEntry, met []byte, tp *int64, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { +func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) { switch { case err == nil: - if (tp == nil || sl.trackTimestampsStaleness) && ce != nil { - sl.cache.trackStaleness(ce.hash, ce.lset) - } return true, nil case errors.Is(err, storage.ErrNotFound): return false, storage.ErrNotFound @@ -1872,7 +1884,7 @@ func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err er } func (sl *scrapeLoop) addReportSample(app storage.Appender, s []byte, t int64, v float64, b *labels.Builder) error { - ce, ok := sl.cache.get(s) + ce, ok, _ := sl.cache.get(s) var ref storage.SeriesRef var lset labels.Labels if ok { diff --git a/scrape/scrape_test.go b/scrape/scrape_test.go index bcaeb460e2..0dfca538d8 100644 --- a/scrape/scrape_test.go +++ b/scrape/scrape_test.go @@ -1068,6 +1068,7 @@ func makeTestMetrics(n int) []byte { fmt.Fprintf(&sb, "# HELP metric_a help text\n") fmt.Fprintf(&sb, "metric_a{foo=\"%d\",bar=\"%d\"} 1\n", i, i*100) } + fmt.Fprintf(&sb, "# EOF\n") return sb.Bytes() } @@ -2635,6 +2636,9 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) { _, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{}) require.Error(t, err) require.NoError(t, slApp.Rollback()) + // We need to cycle staleness cache maps after a manual rollback. Otherwise they will have old entries in them, + // which would cause ErrDuplicateSampleForTimestamp errors on the next append. + sl.cache.iterDone(true) q, err := s.Querier(time.Time{}.UnixNano(), 0) require.NoError(t, err) @@ -2971,7 +2975,7 @@ func TestReuseCacheRace(t *testing.T) { func TestCheckAddError(t *testing.T) { var appErrs appendErrors sl := scrapeLoop{l: log.NewNopLogger(), metrics: newTestScrapeMetrics(t)} - sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs) + sl.checkAddError(nil, storage.ErrOutOfOrderSample, nil, nil, &appErrs) require.Equal(t, 1, appErrs.numOutOfOrder) } @@ -3599,3 +3603,31 @@ func BenchmarkTargetScraperGzip(b *testing.B) { }) } } + +// When a scrape contains multiple instances for the same time series we should increment +// prometheus_target_scrapes_sample_duplicate_timestamp_total metric. +func TestScrapeLoopSeriesAddedDuplicates(t *testing.T) { + ctx, sl := simpleTestScrapeLoop(t) + + slApp := sl.appender(ctx) + total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\ntest_metric 2\ntest_metric 3\n"), "", time.Time{}) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, total) + require.Equal(t, 3, added) + require.Equal(t, 1, seriesAdded) + + slApp = sl.appender(ctx) + total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\ntest_metric 1\ntest_metric 1\n"), "", time.Time{}) + require.NoError(t, err) + require.NoError(t, slApp.Commit()) + require.Equal(t, 3, total) + require.Equal(t, 3, added) + require.Equal(t, 0, seriesAdded) + + metric := dto.Metric{} + err = sl.metrics.targetScrapeSampleDuplicate.Write(&metric) + require.NoError(t, err) + value := metric.GetCounter().GetValue() + require.Equal(t, 4.0, value) +} diff --git a/scripts/sync_repo_files.sh b/scripts/sync_repo_files.sh index 6965a45452..6459fb1e7a 100755 --- a/scripts/sync_repo_files.sh +++ b/scripts/sync_repo_files.sh @@ -37,7 +37,7 @@ if [ -z "${GITHUB_TOKEN}" ]; then fi # List of files that should be synced. -SYNC_FILES="CODE_OF_CONDUCT.md LICENSE Makefile.common SECURITY.md .yamllint scripts/golangci-lint.yml .github/workflows/scorecards.yml" +SYNC_FILES="CODE_OF_CONDUCT.md LICENSE Makefile.common SECURITY.md .yamllint scripts/golangci-lint.yml .github/workflows/scorecards.yml .github/workflows/container_description.yml" # Go to the root of the repo cd "$(git rev-parse --show-cdup)" || exit 1 @@ -99,6 +99,15 @@ check_go() { curl -sLf -o /dev/null "https://raw.githubusercontent.com/${org_repo}/${default_branch}/go.mod" } +check_docker() { + local org_repo + local default_branch + org_repo="$1" + default_branch="$2" + + curl -sLf -o /dev/null "https://raw.githubusercontent.com/${org_repo}/${default_branch}/Dockerfile" +} + process_repo() { local org_repo local default_branch @@ -119,6 +128,10 @@ process_repo() { echo "${org_repo} is not Go, skipping golangci-lint.yml." continue fi + if [[ "${source_file}" == '.github/workflows/container_description.yml' ]] && ! check_docker "${org_repo}" "${default_branch}" ; then + echo "${org_repo} has no Dockerfile, skipping container_description.yml." + continue + fi if [[ "${source_file}" == 'LICENSE' ]] && ! check_license "${target_file}" ; then echo "LICENSE in ${org_repo} is not apache, skipping." continue @@ -131,7 +144,7 @@ process_repo() { if [[ -z "${target_file}" ]]; then echo "${target_filename} doesn't exist in ${org_repo}" case "${source_file}" in - CODE_OF_CONDUCT.md | SECURITY.md) + CODE_OF_CONDUCT.md | SECURITY.md | .github/workflows/container_description.yml) echo "${source_file} missing in ${org_repo}, force updating." needs_update+=("${source_file}") ;; diff --git a/storage/remote/azuread/azuread.go b/storage/remote/azuread/azuread.go index 20d48d0087..e2058fb54d 100644 --- a/storage/remote/azuread/azuread.go +++ b/storage/remote/azuread/azuread.go @@ -61,6 +61,12 @@ type OAuthConfig struct { TenantID string `yaml:"tenant_id,omitempty"` } +// SDKConfig is used to store azure SDK config values. +type SDKConfig struct { + // TenantID is the tenantId of the azure active directory application that is being used to authenticate. + TenantID string `yaml:"tenant_id,omitempty"` +} + // AzureADConfig is used to store the config values. type AzureADConfig struct { //nolint:revive // exported. // ManagedIdentity is the managed identity that is being used to authenticate. @@ -69,6 +75,9 @@ type AzureADConfig struct { //nolint:revive // exported. // OAuth is the oauth config that is being used to authenticate. OAuth *OAuthConfig `yaml:"oauth,omitempty"` + // OAuth is the oauth config that is being used to authenticate. + SDK *SDKConfig `yaml:"sdk,omitempty"` + // Cloud is the Azure cloud in which the service is running. Example: AzurePublic/AzureGovernment/AzureChina. Cloud string `yaml:"cloud,omitempty"` } @@ -102,14 +111,22 @@ func (c *AzureADConfig) Validate() error { return fmt.Errorf("must provide a cloud in the Azure AD config") } - if c.ManagedIdentity == nil && c.OAuth == nil { - return fmt.Errorf("must provide an Azure Managed Identity or Azure OAuth in the Azure AD config") + if c.ManagedIdentity == nil && c.OAuth == nil && c.SDK == nil { + return fmt.Errorf("must provide an Azure Managed Identity, Azure OAuth or Azure SDK in the Azure AD config") } if c.ManagedIdentity != nil && c.OAuth != nil { return fmt.Errorf("cannot provide both Azure Managed Identity and Azure OAuth in the Azure AD config") } + if c.ManagedIdentity != nil && c.SDK != nil { + return fmt.Errorf("cannot provide both Azure Managed Identity and Azure SDK in the Azure AD config") + } + + if c.OAuth != nil && c.SDK != nil { + return fmt.Errorf("cannot provide both Azure OAuth and Azure SDK in the Azure AD config") + } + if c.ManagedIdentity != nil { if c.ManagedIdentity.ClientID == "" { return fmt.Errorf("must provide an Azure Managed Identity client_id in the Azure AD config") @@ -143,6 +160,17 @@ func (c *AzureADConfig) Validate() error { } } + if c.SDK != nil { + var err error + + if c.SDK.TenantID != "" { + _, err = regexp.MatchString("^[0-9a-zA-Z-.]+$", c.SDK.TenantID) + if err != nil { + return fmt.Errorf("the provided Azure OAuth tenant_id is invalid") + } + } + } + return nil } @@ -225,6 +253,16 @@ func newTokenCredential(cfg *AzureADConfig) (azcore.TokenCredential, error) { } } + if cfg.SDK != nil { + sdkConfig := &SDKConfig{ + TenantID: cfg.SDK.TenantID, + } + cred, err = newSDKTokenCredential(clientOpts, sdkConfig) + if err != nil { + return nil, err + } + } + return cred, nil } @@ -241,6 +279,12 @@ func newOAuthTokenCredential(clientOpts *azcore.ClientOptions, oAuthConfig *OAut return azidentity.NewClientSecretCredential(oAuthConfig.TenantID, oAuthConfig.ClientID, oAuthConfig.ClientSecret, opts) } +// newSDKTokenCredential returns new SDK token credential. +func newSDKTokenCredential(clientOpts *azcore.ClientOptions, sdkConfig *SDKConfig) (azcore.TokenCredential, error) { + opts := &azidentity.DefaultAzureCredentialOptions{ClientOptions: *clientOpts, TenantID: sdkConfig.TenantID} + return azidentity.NewDefaultAzureCredential(opts) +} + // newTokenProvider helps to fetch accessToken for different types of credential. This also takes care of // refreshing the accessToken before expiry. This accessToken is attached to the Authorization header while making requests. func newTokenProvider(cfg *AzureADConfig, cred azcore.TokenCredential) (*tokenProvider, error) { diff --git a/storage/remote/azuread/azuread_test.go b/storage/remote/azuread/azuread_test.go index 5eed2c0b19..7c97138120 100644 --- a/storage/remote/azuread/azuread_test.go +++ b/storage/remote/azuread/azuread_test.go @@ -39,7 +39,7 @@ const ( testTokenString = "testTokenString" ) -var testTokenExpiry = time.Now().Add(5 * time.Second) +func testTokenExpiry() time.Time { return time.Now().Add(5 * time.Second) } type AzureAdTestSuite struct { suite.Suite @@ -94,7 +94,7 @@ func (ad *AzureAdTestSuite) TestAzureAdRoundTripper() { testToken := &azcore.AccessToken{ Token: testTokenString, - ExpiresOn: testTokenExpiry, + ExpiresOn: testTokenExpiry(), } ad.mockCredential.On("GetToken", mock.Anything, mock.Anything).Return(*testToken, nil) @@ -145,7 +145,7 @@ func TestAzureAdConfig(t *testing.T) { // Missing managedidentiy or oauth field. { filename: "testdata/azuread_bad_configmissing.yaml", - err: "must provide an Azure Managed Identity or Azure OAuth in the Azure AD config", + err: "must provide an Azure Managed Identity, Azure OAuth or Azure SDK in the Azure AD config", }, // Invalid managedidentity client id. { @@ -162,6 +162,11 @@ func TestAzureAdConfig(t *testing.T) { filename: "testdata/azuread_bad_twoconfig.yaml", err: "cannot provide both Azure Managed Identity and Azure OAuth in the Azure AD config", }, + // Invalid config when both sdk and oauth is provided. + { + filename: "testdata/azuread_bad_oauthsdkconfig.yaml", + err: "cannot provide both Azure OAuth and Azure SDK in the Azure AD config", + }, // Valid config with missing optionally cloud field. { filename: "testdata/azuread_good_cloudmissing.yaml", @@ -174,6 +179,10 @@ func TestAzureAdConfig(t *testing.T) { { filename: "testdata/azuread_good_oauth.yaml", }, + // Valid SDK config. + { + filename: "testdata/azuread_good_sdk.yaml", + }, } for _, c := range cases { _, err := loadAzureAdConfig(c.filename) @@ -232,6 +241,16 @@ func (s *TokenProviderTestSuite) TestNewTokenProvider() { }, err: "Cloud is not specified or is incorrect: ", }, + // Invalid tokenProvider for SDK. + { + cfg: &AzureADConfig{ + Cloud: "PublicAzure", + SDK: &SDKConfig{ + TenantID: dummyTenantID, + }, + }, + err: "Cloud is not specified or is incorrect: ", + }, // Valid tokenProvider for managedidentity. { cfg: &AzureADConfig{ @@ -252,6 +271,15 @@ func (s *TokenProviderTestSuite) TestNewTokenProvider() { }, }, }, + // Valid tokenProvider for SDK. + { + cfg: &AzureADConfig{ + Cloud: "AzurePublic", + SDK: &SDKConfig{ + TenantID: dummyTenantID, + }, + }, + }, } mockGetTokenCallCounter := 1 for _, c := range cases { @@ -264,11 +292,11 @@ func (s *TokenProviderTestSuite) TestNewTokenProvider() { } else { testToken := &azcore.AccessToken{ Token: testTokenString, - ExpiresOn: testTokenExpiry, + ExpiresOn: testTokenExpiry(), } s.mockCredential.On("GetToken", mock.Anything, mock.Anything).Return(*testToken, nil).Once(). - On("GetToken", mock.Anything, mock.Anything).Return(getToken(), nil) + On("GetToken", mock.Anything, mock.Anything).Return(getToken(), nil).Once() actualTokenProvider, actualErr := newTokenProvider(c.cfg, s.mockCredential) diff --git a/storage/remote/azuread/testdata/azuread_bad_oauthsdkconfig.yaml b/storage/remote/azuread/testdata/azuread_bad_oauthsdkconfig.yaml new file mode 100644 index 0000000000..825759d313 --- /dev/null +++ b/storage/remote/azuread/testdata/azuread_bad_oauthsdkconfig.yaml @@ -0,0 +1,7 @@ +cloud: AzurePublic +oauth: + client_id: 00000000-0000-0000-0000-000000000000 + client_secret: Cl1ent$ecret! + tenant_id: 00000000-a12b-3cd4-e56f-000000000000 +sdk: + tenant_id: 00000000-a12b-3cd4-e56f-000000000000 diff --git a/storage/remote/azuread/testdata/azuread_good_sdk.yaml b/storage/remote/azuread/testdata/azuread_good_sdk.yaml new file mode 100644 index 0000000000..53de8897d5 --- /dev/null +++ b/storage/remote/azuread/testdata/azuread_good_sdk.yaml @@ -0,0 +1,3 @@ +cloud: AzurePublic +sdk: + tenant_id: 00000000-a12b-3cd4-e56f-000000000000 diff --git a/storage/remote/otlptranslator/README.md b/storage/remote/otlptranslator/README.md index c2b04e5aff..774fac5a7f 100644 --- a/storage/remote/otlptranslator/README.md +++ b/storage/remote/otlptranslator/README.md @@ -3,7 +3,6 @@ This files in the `prometheus/` and `prometheusremotewrite/` are copied from the OpenTelemetry Project[^1]. This is done instead of adding a go.mod dependency because OpenTelemetry depends on `prometheus/prometheus` and a cyclic dependency will be created. This is just a temporary solution and the long-term solution is to move the required packages from OpenTelemetry into `prometheus/prometheus`. -We don't copy in `./prometheus` through this script because that package imports a collector specific featuregate package we don't want to import. The featuregate package is being removed now, and in the future we will copy this folder too. To update the dependency is a multi-step process: 1. Vendor the latest `prometheus/prometheus`@`main` into [`opentelemetry/opentelemetry-collector-contrib`](https://github.com/open-telemetry/opentelemetry-collector-contrib) @@ -20,4 +19,4 @@ This means if we depend on the upstream packages directly, we will never able to When we do want to make changes to the types in `prompb`, we might need to edit the files directly. That is OK, please let @gouthamve or @jesusvazquez know so they can take care of updating the upstream code (by vendoring in `prometheus/prometheus` upstream and resolving conflicts) and then will run the copy script again to keep things updated. -[^1]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheus and https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheusremotewrite \ No newline at end of file +[^1]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheus and https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/pkg/translator/prometheusremotewrite diff --git a/storage/remote/otlptranslator/prometheus/normalize_label.go b/storage/remote/otlptranslator/prometheus/normalize_label.go index c02800c8bc..a6b41d1c37 100644 --- a/storage/remote/otlptranslator/prometheus/normalize_label.go +++ b/storage/remote/otlptranslator/prometheus/normalize_label.go @@ -3,7 +3,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package prometheus // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +package prometheus // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" import ( "strings" diff --git a/storage/remote/otlptranslator/prometheus/normalize_name.go b/storage/remote/otlptranslator/prometheus/normalize_name.go index 7ae233a187..a976dfb485 100644 --- a/storage/remote/otlptranslator/prometheus/normalize_name.go +++ b/storage/remote/otlptranslator/prometheus/normalize_name.go @@ -3,7 +3,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package prometheus // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +package prometheus // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" import ( "strings" diff --git a/storage/remote/otlptranslator/prometheus/unit_to_ucum.go b/storage/remote/otlptranslator/prometheus/unit_to_ucum.go index 4a72e683bb..718a520675 100644 --- a/storage/remote/otlptranslator/prometheus/unit_to_ucum.go +++ b/storage/remote/otlptranslator/prometheus/unit_to_ucum.go @@ -3,7 +3,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package prometheus // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" +package prometheus // import "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus" import "strings" diff --git a/storage/remote/otlptranslator/update-copy.sh b/storage/remote/otlptranslator/update-copy.sh index f90be5eedc..8aa645e0bd 100755 --- a/storage/remote/otlptranslator/update-copy.sh +++ b/storage/remote/otlptranslator/update-copy.sh @@ -23,5 +23,5 @@ case $(sed --help 2>&1) in *) set sed -i '';; esac -"$@" -e 's#github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus#github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus#g' ./prometheusremotewrite/*.go +"$@" -e 's#github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus#github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus#g' ./prometheusremotewrite/*.go ./prometheus/*.go "$@" -e '1s#^#// DO NOT EDIT. COPIED AS-IS. SEE ../README.md\n\n#g' ./prometheusremotewrite/*.go ./prometheus/*.go diff --git a/storage/remote/read_handler.go b/storage/remote/read_handler.go index ffc64c9c3f..2a00ce897f 100644 --- a/storage/remote/read_handler.go +++ b/storage/remote/read_handler.go @@ -202,34 +202,16 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re return err } - querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs) - if err != nil { + chunks := h.getChunkSeriesSet(ctx, query, filteredMatchers) + if err := chunks.Err(); err != nil { return err } - defer func() { - if err := querier.Close(); err != nil { - level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error()) - } - }() - - var hints *storage.SelectHints - if query.Hints != nil { - hints = &storage.SelectHints{ - Start: query.Hints.StartMs, - End: query.Hints.EndMs, - Step: query.Hints.StepMs, - Func: query.Hints.Func, - Grouping: query.Hints.Grouping, - Range: query.Hints.RangeMs, - By: query.Hints.By, - } - } ws, err := StreamChunkedReadResponses( NewChunkedWriter(w, f), int64(i), // The streaming API has to provide the series sorted. - querier.Select(ctx, true, hints, filteredMatchers...), + chunks, sortedExternalLabels, h.remoteReadMaxBytesInFrame, h.marshalPool, @@ -254,6 +236,35 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re } } +// getChunkSeriesSet executes a query to retrieve a ChunkSeriesSet, +// encapsulating the operation in its own function to ensure timely release of +// the querier resources. +func (h *readHandler) getChunkSeriesSet(ctx context.Context, query *prompb.Query, filteredMatchers []*labels.Matcher) storage.ChunkSeriesSet { + querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + defer func() { + if err := querier.Close(); err != nil { + level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error()) + } + }() + + var hints *storage.SelectHints + if query.Hints != nil { + hints = &storage.SelectHints{ + Start: query.Hints.StartMs, + End: query.Hints.EndMs, + Step: query.Hints.StepMs, + Func: query.Hints.Func, + Grouping: query.Hints.Grouping, + Range: query.Hints.RangeMs, + By: query.Hints.By, + } + } + return querier.Select(ctx, true, hints, filteredMatchers...) +} + // filterExtLabelsFromMatchers change equality matchers which match external labels // to a matcher that looks for an empty label, // as that label should not be present in the storage. diff --git a/tsdb/block_test.go b/tsdb/block_test.go index ad7eb55575..21e20a61c8 100644 --- a/tsdb/block_test.go +++ b/tsdb/block_test.go @@ -209,6 +209,22 @@ func TestCorruptedChunk(t *testing.T) { } } +func sequenceFiles(dir string) ([]string, error) { + files, err := os.ReadDir(dir) + if err != nil { + return nil, err + } + var res []string + + for _, fi := range files { + if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { + continue + } + res = append(res, filepath.Join(dir, fi.Name())) + } + return res, nil +} + func TestLabelValuesWithMatchers(t *testing.T) { tmpdir := t.TempDir() ctx := context.Background() diff --git a/tsdb/chunks/chunks.go b/tsdb/chunks/chunks.go index 543b98c289..0826f69670 100644 --- a/tsdb/chunks/chunks.go +++ b/tsdb/chunks/chunks.go @@ -202,15 +202,6 @@ func ChunkFromSamplesGeneric(s Samples) (Meta, error) { }, nil } -// PopulatedChunk creates a chunk populated with samples every second starting at minTime. -func PopulatedChunk(numSamples int, minTime int64) (Meta, error) { - samples := make([]Sample, numSamples) - for i := 0; i < numSamples; i++ { - samples[i] = sample{t: minTime + int64(i*1000), f: 1.0} - } - return ChunkFromSamples(samples) -} - // ChunkMetasToSamples converts a slice of chunk meta data to a slice of samples. // Used in tests to compare the content of chunks. func ChunkMetasToSamples(chunks []Meta) (result []Sample) { diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 5ba5381320..087f25fbb3 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -15,7 +15,6 @@ package chunks import ( "bufio" - "bytes" "encoding/binary" "errors" "fmt" @@ -690,7 +689,6 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error sgmIndex, chkStart := ref.Unpack() // We skip the series ref and the mint/maxt beforehand. chkStart += SeriesRefSize + (2 * MintMaxtSize) - chkCRC32 := newCRC32() // If it is the current open file, then the chunks can be in the buffer too. if sgmIndex == cdm.curFileSequence { @@ -755,20 +753,13 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error // Check the CRC. sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil { + if err := checkCRC32(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd), sum); err != nil { return nil, &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: sgmIndex, Err: err, } } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), - } - } // The chunk data itself. chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) @@ -802,8 +793,6 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu cdm.fileMaxtSet = true }() - chkCRC32 := newCRC32() - // Iterate files in ascending order. segIDs := make([]int, 0, len(cdm.mmappedChunkFiles)) for seg := range cdm.mmappedChunkFiles { @@ -838,7 +827,6 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu " - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), } } - chkCRC32.Reset() chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx)) startIdx := idx @@ -877,14 +865,11 @@ func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chu // Check CRC. sum := mmapFile.byteSlice.Range(idx, idx+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil { - return err - } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { + if err := checkCRC32(mmapFile.byteSlice.Range(startIdx, idx), sum); err != nil { return &CorruptionErr{ Dir: cdm.dir.Name(), FileIndex: segID, - Err: fmt.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), + Err: err, } } idx += CRCSize diff --git a/tsdb/db.go b/tsdb/db.go index 4998da6aa7..e49c5811d2 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -24,7 +24,6 @@ import ( "os" "path/filepath" "slices" - "strconv" "strings" "sync" "time" @@ -779,10 +778,6 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs walDir := filepath.Join(dir, "wal") wblDir := filepath.Join(dir, wlog.WblDirName) - // Migrate old WAL if one exists. - if err := MigrateWAL(l, walDir); err != nil { - return nil, fmt.Errorf("migrate WAL: %w", err) - } for _, tmpDir := range []string{walDir, dir} { // Remove tmp dirs. if err := removeBestEffortTmpDirs(l, tmpDir); err != nil { @@ -1615,7 +1610,7 @@ func BeyondTimeRetention(db *DB, blocks []*Block) (deletable map[ulid.ULID]struc for i, block := range blocks { // The difference between the first block and this block is larger than // the retention period so any blocks after that are added as deletable. - if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > db.opts.RetentionDuration { + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime >= db.opts.RetentionDuration { for _, b := range blocks[i:] { deletable[b.meta.ULID] = struct{}{} } @@ -2213,39 +2208,6 @@ func blockDirs(dir string) ([]string, error) { return dirs, nil } -func sequenceFiles(dir string) ([]string, error) { - files, err := os.ReadDir(dir) - if err != nil { - return nil, err - } - var res []string - - for _, fi := range files { - if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { - continue - } - res = append(res, filepath.Join(dir, fi.Name())) - } - return res, nil -} - -func nextSequenceFile(dir string) (string, int, error) { - files, err := os.ReadDir(dir) - if err != nil { - return "", 0, err - } - - i := uint64(0) - for _, f := range files { - j, err := strconv.ParseUint(f.Name(), 10, 64) - if err != nil { - continue - } - i = j - } - return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil -} - func exponential(d, min, max time.Duration) time.Duration { d *= 2 if d < min { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 45a16b8ba3..498e26c588 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -681,6 +681,34 @@ func TestDB_Snapshot(t *testing.T) { require.Equal(t, 1000.0, sum) } +func TestDB_BeyondTimeRetention(t *testing.T) { + opts := DefaultOptions() + opts.RetentionDuration = 100 + db := openTestDB(t, opts, nil) + defer func() { + require.NoError(t, db.Close()) + }() + + // We have 4 blocks, 3 of which are beyond the retention duration. + metas := []BlockMeta{ + {MinTime: 300, MaxTime: 500}, + {MinTime: 200, MaxTime: 300}, + {MinTime: 100, MaxTime: 200}, + {MinTime: 0, MaxTime: 100}, + } + + for _, m := range metas { + createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) + } + + // Reloading should truncate the 3 blocks which are >= the retention period. + require.NoError(t, db.reloadBlocks()) + blocks := db.Blocks() + require.Len(t, blocks, 1) + require.Equal(t, metas[0].MinTime, blocks[0].Meta().MinTime) + require.Equal(t, metas[0].MaxTime, blocks[0].Meta().MaxTime) +} + // TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples // that are outside the set block time range. // See https://github.com/prometheus/prometheus/issues/5105 @@ -3598,7 +3626,7 @@ func testChunkQuerierShouldNotPanicIfHeadChunkIsTruncatedWhileReadingQueriedChun // just to iterate through the bytes slice. We don't really care the reason why // we read this data, we just need to read it to make sure the memory address // of the []byte is still valid. - chkCRC32 := newCRC32() + chkCRC32 := crc32.New(crc32.MakeTable(crc32.Castagnoli)) for _, chunk := range chunks { chkCRC32.Reset() _, err := chkCRC32.Write(chunk.Bytes()) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 7aae4c2645..7ab890b99e 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1829,7 +1829,7 @@ func NewStringListIter(s []string) StringIter { return &stringListIter{l: s} } -// symbolsIter implements StringIter. +// stringListIter implements StringIter. type stringListIter struct { l []string cur string diff --git a/tsdb/wal.go b/tsdb/wal.go deleted file mode 100644 index e06a8aea53..0000000000 --- a/tsdb/wal.go +++ /dev/null @@ -1,1303 +0,0 @@ -// Copyright 2017 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsdb - -import ( - "bufio" - "encoding/binary" - "errors" - "fmt" - "hash" - "hash/crc32" - "io" - "math" - "os" - "path/filepath" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/prometheus/client_golang/prometheus" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/encoding" - "github.com/prometheus/prometheus/tsdb/fileutil" - "github.com/prometheus/prometheus/tsdb/record" - "github.com/prometheus/prometheus/tsdb/tombstones" - "github.com/prometheus/prometheus/tsdb/wlog" - "github.com/prometheus/prometheus/util/zeropool" -) - -// WALEntryType indicates what data a WAL entry contains. -type WALEntryType uint8 - -const ( - // WALMagic is a 4 byte number every WAL segment file starts with. - WALMagic = uint32(0x43AF00EF) - - // WALFormatDefault is the version flag for the default outer segment file format. - WALFormatDefault = byte(1) -) - -// Entry types in a segment file. -const ( - WALEntrySymbols WALEntryType = 1 - WALEntrySeries WALEntryType = 2 - WALEntrySamples WALEntryType = 3 - WALEntryDeletes WALEntryType = 4 -) - -type walMetrics struct { - fsyncDuration prometheus.Summary - corruptions prometheus.Counter -} - -func newWalMetrics(r prometheus.Registerer) *walMetrics { - m := &walMetrics{} - - m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "prometheus_tsdb_wal_fsync_duration_seconds", - Help: "Duration of WAL fsync.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) - m.corruptions = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_corruptions_total", - Help: "Total number of WAL corruptions.", - }) - - if r != nil { - r.MustRegister( - m.fsyncDuration, - m.corruptions, - ) - } - return m -} - -// WAL is a write ahead log that can log new series labels and samples. -// It must be completely read before new entries are logged. -// -// Deprecated: use wlog pkg combined with the record codex instead. -type WAL interface { - Reader() WALReader - LogSeries([]record.RefSeries) error - LogSamples([]record.RefSample) error - LogDeletes([]tombstones.Stone) error - Truncate(mint int64, keep func(uint64) bool) error - Close() error -} - -// WALReader reads entries from a WAL. -type WALReader interface { - Read( - seriesf func([]record.RefSeries), - samplesf func([]record.RefSample), - deletesf func([]tombstones.Stone), - ) error -} - -// segmentFile wraps a file object of a segment and tracks the highest timestamp -// it contains. During WAL truncating, all segments with no higher timestamp than -// the truncation threshold can be compacted. -type segmentFile struct { - *os.File - maxTime int64 // highest tombstone or sample timestamp in segment - minSeries chunks.HeadSeriesRef // lowerst series ID in segment -} - -func newSegmentFile(f *os.File) *segmentFile { - return &segmentFile{ - File: f, - maxTime: math.MinInt64, - minSeries: math.MaxUint64, - } -} - -const ( - walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB -) - -// The table gets initialized with sync.Once but may still cause a race -// with any other use of the crc32 package anywhere. Thus we initialize it -// before. -var castagnoliTable *crc32.Table - -func init() { - castagnoliTable = crc32.MakeTable(crc32.Castagnoli) -} - -// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the -// polynomial may be easily changed in one location at a later time, if necessary. -func newCRC32() hash.Hash32 { - return crc32.New(castagnoliTable) -} - -// SegmentWAL is a write ahead log for series data. -// -// Deprecated: use wlog pkg combined with the record coders instead. -type SegmentWAL struct { - mtx sync.Mutex - metrics *walMetrics - - dirFile *os.File - files []*segmentFile - - logger log.Logger - flushInterval time.Duration - segmentSize int64 - - crc32 hash.Hash32 - cur *bufio.Writer - curN int64 - - stopc chan struct{} - donec chan struct{} - actorc chan func() error // sequentialized background operations - buffers sync.Pool -} - -// OpenSegmentWAL opens or creates a write ahead log in the given directory. -// The WAL must be read completely before new data is written. -func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error) { - if err := os.MkdirAll(dir, 0o777); err != nil { - return nil, err - } - df, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - if logger == nil { - logger = log.NewNopLogger() - } - - w := &SegmentWAL{ - dirFile: df, - logger: logger, - flushInterval: flushInterval, - donec: make(chan struct{}), - stopc: make(chan struct{}), - actorc: make(chan func() error, 2), - segmentSize: walSegmentSizeBytes, - crc32: newCRC32(), - } - w.metrics = newWalMetrics(r) - - fns, err := sequenceFiles(w.dirFile.Name()) - if err != nil { - return nil, err - } - - for i, fn := range fns { - f, err := w.openSegmentFile(fn) - if err == nil { - w.files = append(w.files, newSegmentFile(f)) - continue - } - level.Warn(logger).Log("msg", "Invalid segment file detected, truncating WAL", "err", err, "file", fn) - - for _, fn := range fns[i:] { - if err := os.Remove(fn); err != nil { - return w, fmt.Errorf("removing segment failed: %w", err) - } - } - break - } - - go w.run(flushInterval) - - return w, nil -} - -// repairingWALReader wraps a WAL reader and truncates its underlying SegmentWAL after the last -// valid entry if it encounters corruption. -type repairingWALReader struct { - wal *SegmentWAL - r WALReader -} - -func (r *repairingWALReader) Read( - seriesf func([]record.RefSeries), - samplesf func([]record.RefSample), - deletesf func([]tombstones.Stone), -) error { - err := r.r.Read(seriesf, samplesf, deletesf) - if err == nil { - return nil - } - var cerr *walCorruptionErr - if !errors.As(err, &cerr) { - return err - } - r.wal.metrics.corruptions.Inc() - return r.wal.truncate(cerr.err, cerr.file, cerr.lastOffset) -} - -// truncate the WAL after the last valid entry. -func (w *SegmentWAL) truncate(err error, file int, lastOffset int64) error { - level.Error(w.logger).Log("msg", "WAL corruption detected; truncating", - "err", err, "file", w.files[file].Name(), "pos", lastOffset) - - // Close and delete all files after the current one. - for _, f := range w.files[file+1:] { - if err := f.Close(); err != nil { - return err - } - if err := os.Remove(f.Name()); err != nil { - return err - } - } - w.mtx.Lock() - defer w.mtx.Unlock() - - w.files = w.files[:file+1] - - // Seek the current file to the last valid offset where we continue writing from. - _, err = w.files[file].Seek(lastOffset, io.SeekStart) - return err -} - -// Reader returns a new reader over the write ahead log data. -// It must be completely consumed before writing to the WAL. -func (w *SegmentWAL) Reader() WALReader { - return &repairingWALReader{ - wal: w, - r: newWALReader(w.files, w.logger), - } -} - -func (w *SegmentWAL) getBuffer() *encoding.Encbuf { - b := w.buffers.Get() - if b == nil { - return &encoding.Encbuf{B: make([]byte, 0, 64*1024)} - } - return b.(*encoding.Encbuf) -} - -func (w *SegmentWAL) putBuffer(b *encoding.Encbuf) { - b.Reset() - w.buffers.Put(b) -} - -// Truncate deletes the values prior to mint and the series which the keep function -// does not indicate to preserve. -func (w *SegmentWAL) Truncate(mint int64, keep func(chunks.HeadSeriesRef) bool) error { - // The last segment is always active. - if len(w.files) < 2 { - return nil - } - var candidates []*segmentFile - - // All files have to be traversed as there could be two segments for a block - // with first block having times (10000, 20000) and SECOND one having (0, 10000). - for _, sf := range w.files[:len(w.files)-1] { - if sf.maxTime >= mint { - break - } - // Past WAL files are closed. We have to reopen them for another read. - f, err := w.openSegmentFile(sf.Name()) - if err != nil { - return fmt.Errorf("open old WAL segment for read: %w", err) - } - candidates = append(candidates, &segmentFile{ - File: f, - minSeries: sf.minSeries, - maxTime: sf.maxTime, - }) - } - if len(candidates) == 0 { - return nil - } - - r := newWALReader(candidates, w.logger) - - // Create a new tmp file. - f, err := w.createSegmentFile(filepath.Join(w.dirFile.Name(), "compact.tmp")) - if err != nil { - return fmt.Errorf("create compaction segment: %w", err) - } - defer func() { - if err := os.RemoveAll(f.Name()); err != nil { - level.Error(w.logger).Log("msg", "remove tmp file", "err", err.Error()) - } - }() - - var ( - csf = newSegmentFile(f) - crc32 = newCRC32() - decSeries = []record.RefSeries{} - activeSeries = []record.RefSeries{} - ) - - for r.next() { - rt, flag, byt := r.at() - - if rt != WALEntrySeries { - continue - } - decSeries = decSeries[:0] - activeSeries = activeSeries[:0] - - err := r.decodeSeries(flag, byt, &decSeries) - if err != nil { - return fmt.Errorf("decode samples while truncating: %w", err) - } - for _, s := range decSeries { - if keep(s.Ref) { - activeSeries = append(activeSeries, s) - } - } - - buf := w.getBuffer() - flag = w.encodeSeries(buf, activeSeries) - - _, err = w.writeTo(csf, crc32, WALEntrySeries, flag, buf.Get()) - w.putBuffer(buf) - - if err != nil { - return fmt.Errorf("write to compaction segment: %w", err) - } - } - if err := r.Err(); err != nil { - return fmt.Errorf("read candidate WAL files: %w", err) - } - - off, err := csf.Seek(0, io.SeekCurrent) - if err != nil { - return err - } - if err := csf.Truncate(off); err != nil { - return err - } - if err := csf.Sync(); err != nil { - return nil - } - if err := csf.Close(); err != nil { - return nil - } - - _ = candidates[0].Close() // need close before remove on platform windows - if err := fileutil.Replace(csf.Name(), candidates[0].Name()); err != nil { - return fmt.Errorf("rename compaction segment: %w", err) - } - for _, f := range candidates[1:] { - f.Close() // need close before remove on platform windows - if err := os.RemoveAll(f.Name()); err != nil { - return fmt.Errorf("delete WAL segment file: %w", err) - } - } - if err := w.dirFile.Sync(); err != nil { - return err - } - - // The file object of csf still holds the name before rename. Recreate it so - // subsequent truncations do not look at a non-existent file name. - csf.File, err = w.openSegmentFile(candidates[0].Name()) - if err != nil { - return err - } - // We don't need it to be open. - if err := csf.Close(); err != nil { - return err - } - - w.mtx.Lock() - w.files = append([]*segmentFile{csf}, w.files[len(candidates):]...) - w.mtx.Unlock() - - return nil -} - -// LogSeries writes a batch of new series labels to the log. -// The series have to be ordered. -func (w *SegmentWAL) LogSeries(series []record.RefSeries) error { - buf := w.getBuffer() - - flag := w.encodeSeries(buf, series) - - w.mtx.Lock() - defer w.mtx.Unlock() - - err := w.write(WALEntrySeries, flag, buf.Get()) - - w.putBuffer(buf) - - if err != nil { - return fmt.Errorf("log series: %w", err) - } - - tf := w.head() - - for _, s := range series { - if tf.minSeries > s.Ref { - tf.minSeries = s.Ref - } - } - return nil -} - -// LogSamples writes a batch of new samples to the log. -func (w *SegmentWAL) LogSamples(samples []record.RefSample) error { - buf := w.getBuffer() - - flag := w.encodeSamples(buf, samples) - - w.mtx.Lock() - defer w.mtx.Unlock() - - err := w.write(WALEntrySamples, flag, buf.Get()) - - w.putBuffer(buf) - - if err != nil { - return fmt.Errorf("log series: %w", err) - } - tf := w.head() - - for _, s := range samples { - if tf.maxTime < s.T { - tf.maxTime = s.T - } - } - return nil -} - -// LogDeletes write a batch of new deletes to the log. -func (w *SegmentWAL) LogDeletes(stones []tombstones.Stone) error { - buf := w.getBuffer() - - flag := w.encodeDeletes(buf, stones) - - w.mtx.Lock() - defer w.mtx.Unlock() - - err := w.write(WALEntryDeletes, flag, buf.Get()) - - w.putBuffer(buf) - - if err != nil { - return fmt.Errorf("log series: %w", err) - } - tf := w.head() - - for _, s := range stones { - for _, iv := range s.Intervals { - if tf.maxTime < iv.Maxt { - tf.maxTime = iv.Maxt - } - } - } - return nil -} - -// openSegmentFile opens the given segment file and consumes and validates header. -func (w *SegmentWAL) openSegmentFile(name string) (*os.File, error) { - // We must open all files in read/write mode as we may have to truncate along - // the way and any file may become the head. - f, err := os.OpenFile(name, os.O_RDWR, 0o666) - if err != nil { - return nil, err - } - metab := make([]byte, 8) - - // If there is an error, we need close f for platform windows before gc. - // Otherwise, file op may fail. - hasError := true - defer func() { - if hasError { - f.Close() - } - }() - - switch n, err := f.Read(metab); { - case err != nil: - return nil, fmt.Errorf("validate meta %q: %w", f.Name(), err) - case n != 8: - return nil, fmt.Errorf("invalid header size %d in %q", n, f.Name()) - } - - if m := binary.BigEndian.Uint32(metab[:4]); m != WALMagic { - return nil, fmt.Errorf("invalid magic header %x in %q", m, f.Name()) - } - if metab[4] != WALFormatDefault { - return nil, fmt.Errorf("unknown WAL segment format %d in %q", metab[4], f.Name()) - } - hasError = false - return f, nil -} - -// createSegmentFile creates a new segment file with the given name. It preallocates -// the standard segment size if possible and writes the header. -func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { - f, err := os.Create(name) - if err != nil { - return nil, err - } - if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil { - return nil, err - } - // Write header metadata for new file. - metab := make([]byte, 8) - binary.BigEndian.PutUint32(metab[:4], WALMagic) - metab[4] = WALFormatDefault - - if _, err := f.Write(metab); err != nil { - return nil, err - } - return f, err -} - -// cut finishes the currently active segments and opens the next one. -// The encoder is reset to point to the new segment. -func (w *SegmentWAL) cut() error { - // Sync current head to disk and close. - if hf := w.head(); hf != nil { - if err := w.flush(); err != nil { - return err - } - // Finish last segment asynchronously to not block the WAL moving along - // in the new segment. - go func() { - w.actorc <- func() error { - off, err := hf.Seek(0, io.SeekCurrent) - if err != nil { - return fmt.Errorf("finish old segment %s: %w", hf.Name(), err) - } - if err := hf.Truncate(off); err != nil { - return fmt.Errorf("finish old segment %s: %w", hf.Name(), err) - } - if err := hf.Sync(); err != nil { - return fmt.Errorf("finish old segment %s: %w", hf.Name(), err) - } - if err := hf.Close(); err != nil { - return fmt.Errorf("finish old segment %s: %w", hf.Name(), err) - } - return nil - } - }() - } - - p, _, err := nextSequenceFile(w.dirFile.Name()) - if err != nil { - return err - } - f, err := w.createSegmentFile(p) - if err != nil { - return err - } - - go func() { - w.actorc <- func() error { - if err := w.dirFile.Sync(); err != nil { - return fmt.Errorf("sync WAL directory: %w", err) - } - return nil - } - }() - - w.files = append(w.files, newSegmentFile(f)) - - // TODO(gouthamve): make the buffer size a constant. - w.cur = bufio.NewWriterSize(f, 8*1024*1024) - w.curN = 8 - - return nil -} - -func (w *SegmentWAL) head() *segmentFile { - if len(w.files) == 0 { - return nil - } - return w.files[len(w.files)-1] -} - -// Sync flushes the changes to disk. -func (w *SegmentWAL) Sync() error { - var head *segmentFile - var err error - - // Flush the writer and retrieve the reference to the head segment under mutex lock. - func() { - w.mtx.Lock() - defer w.mtx.Unlock() - if err = w.flush(); err != nil { - return - } - head = w.head() - }() - if err != nil { - return fmt.Errorf("flush buffer: %w", err) - } - if head != nil { - // But only fsync the head segment after releasing the mutex as it will block on disk I/O. - start := time.Now() - err := fileutil.Fdatasync(head.File) - w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) - return err - } - return nil -} - -func (w *SegmentWAL) sync() error { - if err := w.flush(); err != nil { - return err - } - if w.head() == nil { - return nil - } - - start := time.Now() - err := fileutil.Fdatasync(w.head().File) - w.metrics.fsyncDuration.Observe(time.Since(start).Seconds()) - return err -} - -func (w *SegmentWAL) flush() error { - if w.cur == nil { - return nil - } - return w.cur.Flush() -} - -func (w *SegmentWAL) run(interval time.Duration) { - var tick <-chan time.Time - - if interval > 0 { - ticker := time.NewTicker(interval) - defer ticker.Stop() - tick = ticker.C - } - defer close(w.donec) - - for { - // Processing all enqueued operations has precedence over shutdown and - // background syncs. - select { - case f := <-w.actorc: - if err := f(); err != nil { - level.Error(w.logger).Log("msg", "operation failed", "err", err) - } - continue - default: - } - select { - case <-w.stopc: - return - case f := <-w.actorc: - if err := f(); err != nil { - level.Error(w.logger).Log("msg", "operation failed", "err", err) - } - case <-tick: - if err := w.Sync(); err != nil { - level.Error(w.logger).Log("msg", "sync failed", "err", err) - } - } - } -} - -// Close syncs all data and closes the underlying resources. -func (w *SegmentWAL) Close() error { - // Make sure you can call Close() multiple times. - select { - case <-w.stopc: - return nil // Already closed. - default: - } - - close(w.stopc) - <-w.donec - - w.mtx.Lock() - defer w.mtx.Unlock() - - if err := w.sync(); err != nil { - return err - } - // On opening, a WAL must be fully consumed once. Afterwards - // only the current segment will still be open. - if hf := w.head(); hf != nil { - if err := hf.Close(); err != nil { - return fmt.Errorf("closing WAL head %s: %w", hf.Name(), err) - } - } - if err := w.dirFile.Close(); err != nil { - return fmt.Errorf("closing WAL dir %s: %w", w.dirFile.Name(), err) - } - return nil -} - -func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { - // Cut to the next segment if the entry exceeds the file size unless it would also - // exceed the size of a new segment. - // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. - var ( - sz = int64(len(buf)) + 6 - newsz = w.curN + sz - ) - // XXX(fabxc): this currently cuts a new file whenever the WAL was newly opened. - // Probably fine in general but may yield a lot of short files in some cases. - if w.cur == nil || w.curN > w.segmentSize || newsz > w.segmentSize && sz <= w.segmentSize { - if err := w.cut(); err != nil { - return err - } - } - n, err := w.writeTo(w.cur, w.crc32, t, flag, buf) - - w.curN += int64(n) - - return err -} - -func (w *SegmentWAL) writeTo(wr io.Writer, crc32 hash.Hash, t WALEntryType, flag uint8, buf []byte) (int, error) { - if len(buf) == 0 { - return 0, nil - } - crc32.Reset() - wr = io.MultiWriter(crc32, wr) - - var b [6]byte - b[0] = byte(t) - b[1] = flag - - binary.BigEndian.PutUint32(b[2:], uint32(len(buf))) - - n1, err := wr.Write(b[:]) - if err != nil { - return n1, err - } - n2, err := wr.Write(buf) - if err != nil { - return n1 + n2, err - } - n3, err := wr.Write(crc32.Sum(b[:0])) - - return n1 + n2 + n3, err -} - -const ( - walSeriesSimple = 1 - walSamplesSimple = 1 - walDeletesSimple = 1 -) - -func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []record.RefSeries) uint8 { - for _, s := range series { - buf.PutBE64(uint64(s.Ref)) - record.EncodeLabels(buf, s.Labels) - } - return walSeriesSimple -} - -func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []record.RefSample) uint8 { - if len(samples) == 0 { - return walSamplesSimple - } - // Store base timestamp and base reference number of first sample. - // All samples encode their timestamp and ref as delta to those. - // - // TODO(fabxc): optimize for all samples having the same timestamp. - first := samples[0] - - buf.PutBE64(uint64(first.Ref)) - buf.PutBE64int64(first.T) - - for _, s := range samples { - buf.PutVarint64(int64(s.Ref) - int64(first.Ref)) - buf.PutVarint64(s.T - first.T) - buf.PutBE64(math.Float64bits(s.V)) - } - return walSamplesSimple -} - -func (w *SegmentWAL) encodeDeletes(buf *encoding.Encbuf, stones []tombstones.Stone) uint8 { - for _, s := range stones { - for _, iv := range s.Intervals { - buf.PutBE64(uint64(s.Ref)) - buf.PutVarint64(iv.Mint) - buf.PutVarint64(iv.Maxt) - } - } - return walDeletesSimple -} - -// walReader decodes and emits write ahead log entries. -type walReader struct { - logger log.Logger - - files []*segmentFile - cur int - buf []byte - crc32 hash.Hash32 - dec record.Decoder - - curType WALEntryType - curFlag byte - curBuf []byte - lastOffset int64 // offset after last successfully read entry - - err error -} - -func newWALReader(files []*segmentFile, l log.Logger) *walReader { - if l == nil { - l = log.NewNopLogger() - } - return &walReader{ - logger: l, - files: files, - buf: make([]byte, 0, 128*4096), - crc32: newCRC32(), - dec: record.NewDecoder(labels.NewSymbolTable()), - } -} - -// Err returns the last error the reader encountered. -func (r *walReader) Err() error { - return r.err -} - -func (r *walReader) Read( - seriesf func([]record.RefSeries), - samplesf func([]record.RefSample), - deletesf func([]tombstones.Stone), -) error { - // Concurrency for replaying the WAL is very limited. We at least split out decoding and - // processing into separate threads. - // Historically, the processing is the bottleneck with reading and decoding using only - // 15% of the CPU. - var ( - seriesPool zeropool.Pool[[]record.RefSeries] - samplePool zeropool.Pool[[]record.RefSample] - deletePool zeropool.Pool[[]tombstones.Stone] - ) - donec := make(chan struct{}) - datac := make(chan interface{}, 100) - - go func() { - defer close(donec) - - for x := range datac { - switch v := x.(type) { - case []record.RefSeries: - if seriesf != nil { - seriesf(v) - } - seriesPool.Put(v[:0]) - case []record.RefSample: - if samplesf != nil { - samplesf(v) - } - samplePool.Put(v[:0]) - case []tombstones.Stone: - if deletesf != nil { - deletesf(v) - } - deletePool.Put(v[:0]) - default: - level.Error(r.logger).Log("msg", "unexpected data type") - } - } - }() - - var err error - - for r.next() { - et, flag, b := r.at() - - // In decoding below we never return a walCorruptionErr for now. - // Those should generally be caught by entry decoding before. - switch et { - case WALEntrySeries: - series := seriesPool.Get() - if series == nil { - series = make([]record.RefSeries, 0, 512) - } - - err = r.decodeSeries(flag, b, &series) - if err != nil { - err = fmt.Errorf("decode series entry: %w", err) - break - } - datac <- series - - cf := r.current() - for _, s := range series { - if cf.minSeries > s.Ref { - cf.minSeries = s.Ref - } - } - case WALEntrySamples: - samples := samplePool.Get() - if samples == nil { - samples = make([]record.RefSample, 0, 512) - } - - err = r.decodeSamples(flag, b, &samples) - if err != nil { - err = fmt.Errorf("decode samples entry: %w", err) - break - } - datac <- samples - - // Update the times for the WAL segment file. - cf := r.current() - for _, s := range samples { - if cf.maxTime < s.T { - cf.maxTime = s.T - } - } - case WALEntryDeletes: - deletes := deletePool.Get() - if deletes == nil { - deletes = make([]tombstones.Stone, 0, 512) - } - - err = r.decodeDeletes(flag, b, &deletes) - if err != nil { - err = fmt.Errorf("decode delete entry: %w", err) - break - } - datac <- deletes - - // Update the times for the WAL segment file. - cf := r.current() - for _, s := range deletes { - for _, iv := range s.Intervals { - if cf.maxTime < iv.Maxt { - cf.maxTime = iv.Maxt - } - } - } - } - } - close(datac) - <-donec - - if err != nil { - return err - } - if err := r.Err(); err != nil { - return fmt.Errorf("read entry: %w", err) - } - return nil -} - -func (r *walReader) at() (WALEntryType, byte, []byte) { - return r.curType, r.curFlag, r.curBuf -} - -// next returns decodes the next entry pair and returns true -// if it was successful. -func (r *walReader) next() bool { - if r.cur >= len(r.files) { - return false - } - cf := r.files[r.cur] - - // Remember the offset after the last correctly read entry. If the next one - // is corrupted, this is where we can safely truncate. - r.lastOffset, r.err = cf.Seek(0, io.SeekCurrent) - if r.err != nil { - return false - } - - et, flag, b, err := r.entry(cf) - // If we reached the end of the reader, advance to the next one - // and close. - // Do not close on the last one as it will still be appended to. - if errors.Is(err, io.EOF) { - if r.cur == len(r.files)-1 { - return false - } - // Current reader completed, close and move to the next one. - if err := cf.Close(); err != nil { - r.err = err - return false - } - r.cur++ - return r.next() - } - if err != nil { - r.err = err - return false - } - - r.curType = et - r.curFlag = flag - r.curBuf = b - return r.err == nil -} - -func (r *walReader) current() *segmentFile { - return r.files[r.cur] -} - -// walCorruptionErr is a type wrapper for errors that indicate WAL corruption -// and trigger a truncation. -type walCorruptionErr struct { - err error - file int - lastOffset int64 -} - -func (e *walCorruptionErr) Error() string { - return fmt.Sprintf("%s ", e.err, e.file, e.lastOffset) -} - -func (e *walCorruptionErr) Unwrap() error { - return e.err -} - -func (r *walReader) corruptionErr(s string, args ...interface{}) error { - return &walCorruptionErr{ - err: fmt.Errorf(s, args...), - file: r.cur, - lastOffset: r.lastOffset, - } -} - -func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { - r.crc32.Reset() - tr := io.TeeReader(cr, r.crc32) - - b := make([]byte, 6) - switch n, err := tr.Read(b); { - case err != nil: - return 0, 0, nil, err - case n != 6: - return 0, 0, nil, r.corruptionErr("invalid entry header size %d", n) - } - - var ( - etype = WALEntryType(b[0]) - flag = b[1] - length = int(binary.BigEndian.Uint32(b[2:])) - ) - // Exit if we reached pre-allocated space. - if etype == 0 { - return 0, 0, nil, io.EOF - } - if etype != WALEntrySeries && etype != WALEntrySamples && etype != WALEntryDeletes { - return 0, 0, nil, r.corruptionErr("invalid entry type %d", etype) - } - - if length > len(r.buf) { - r.buf = make([]byte, length) - } - buf := r.buf[:length] - - switch n, err := tr.Read(buf); { - case err != nil: - return 0, 0, nil, err - case n != length: - return 0, 0, nil, r.corruptionErr("invalid entry body size %d", n) - } - - switch n, err := cr.Read(b[:4]); { - case err != nil: - return 0, 0, nil, err - case n != 4: - return 0, 0, nil, r.corruptionErr("invalid checksum length %d", n) - } - if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { - return 0, 0, nil, r.corruptionErr("unexpected CRC32 checksum %x, want %x", has, exp) - } - - return etype, flag, buf, nil -} - -func (r *walReader) decodeSeries(flag byte, b []byte, res *[]record.RefSeries) error { - dec := encoding.Decbuf{B: b} - - for len(dec.B) > 0 && dec.Err() == nil { - ref := chunks.HeadSeriesRef(dec.Be64()) - lset := r.dec.DecodeLabels(&dec) - - *res = append(*res, record.RefSeries{ - Ref: ref, - Labels: lset, - }) - } - if dec.Err() != nil { - return dec.Err() - } - if len(dec.B) > 0 { - return fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) - } - return nil -} - -func (r *walReader) decodeSamples(flag byte, b []byte, res *[]record.RefSample) error { - if len(b) == 0 { - return nil - } - dec := encoding.Decbuf{B: b} - - var ( - baseRef = dec.Be64() - baseTime = dec.Be64int64() - ) - - for len(dec.B) > 0 && dec.Err() == nil { - dref := dec.Varint64() - dtime := dec.Varint64() - val := dec.Be64() - - *res = append(*res, record.RefSample{ - Ref: chunks.HeadSeriesRef(int64(baseRef) + dref), - T: baseTime + dtime, - V: math.Float64frombits(val), - }) - } - - if err := dec.Err(); err != nil { - return fmt.Errorf("decode error after %d samples: %w", len(*res), err) - } - if len(dec.B) > 0 { - return fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) - } - return nil -} - -func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]tombstones.Stone) error { - dec := &encoding.Decbuf{B: b} - - for dec.Len() > 0 && dec.Err() == nil { - *res = append(*res, tombstones.Stone{ - Ref: storage.SeriesRef(dec.Be64()), - Intervals: tombstones.Intervals{ - {Mint: dec.Varint64(), Maxt: dec.Varint64()}, - }, - }) - } - if dec.Err() != nil { - return dec.Err() - } - if len(dec.B) > 0 { - return fmt.Errorf("unexpected %d bytes left in entry", len(dec.B)) - } - return nil -} - -func deprecatedWALExists(logger log.Logger, dir string) (bool, error) { - // Detect whether we still have the old WAL. - fns, err := sequenceFiles(dir) - if err != nil && !os.IsNotExist(err) { - return false, fmt.Errorf("list sequence files: %w", err) - } - if len(fns) == 0 { - return false, nil // No WAL at all yet. - } - // Check header of first segment to see whether we are still dealing with an - // old WAL. - f, err := os.Open(fns[0]) - if err != nil { - return false, fmt.Errorf("check first existing segment: %w", err) - } - defer f.Close() - - var hdr [4]byte - if _, err := f.Read(hdr[:]); err != nil && !errors.Is(err, io.EOF) { - return false, fmt.Errorf("read header from first segment: %w", err) - } - // If we cannot read the magic header for segments of the old WAL, abort. - // Either it's migrated already or there's a corruption issue with which - // we cannot deal here anyway. Subsequent attempts to open the WAL will error in that case. - if binary.BigEndian.Uint32(hdr[:]) != WALMagic { - return false, nil - } - return true, nil -} - -// MigrateWAL rewrites the deprecated write ahead log into the new format. -func MigrateWAL(logger log.Logger, dir string) (err error) { - if logger == nil { - logger = log.NewNopLogger() - } - if exists, err := deprecatedWALExists(logger, dir); err != nil || !exists { - return err - } - level.Info(logger).Log("msg", "Migrating WAL format") - - tmpdir := dir + ".tmp" - if err := os.RemoveAll(tmpdir); err != nil { - return fmt.Errorf("cleanup replacement dir: %w", err) - } - repl, err := wlog.New(logger, nil, tmpdir, wlog.CompressionNone) - if err != nil { - return fmt.Errorf("open new WAL: %w", err) - } - - // It should've already been closed as part of the previous finalization. - // Do it once again in case of prior errors. - defer func() { - if err != nil { - repl.Close() - } - }() - - w, err := OpenSegmentWAL(dir, logger, time.Minute, nil) - if err != nil { - return fmt.Errorf("open old WAL: %w", err) - } - defer w.Close() - - rdr := w.Reader() - - var ( - enc record.Encoder - b []byte - ) - decErr := rdr.Read( - func(s []record.RefSeries) { - if err != nil { - return - } - err = repl.Log(enc.Series(s, b[:0])) - }, - func(s []record.RefSample) { - if err != nil { - return - } - err = repl.Log(enc.Samples(s, b[:0])) - }, - func(s []tombstones.Stone) { - if err != nil { - return - } - err = repl.Log(enc.Tombstones(s, b[:0])) - }, - ) - if decErr != nil { - return fmt.Errorf("decode old entries: %w", err) - } - if err != nil { - return fmt.Errorf("write new entries: %w", err) - } - // We explicitly close even when there is a defer for Windows to be - // able to delete it. The defer is in place to close it in-case there - // are errors above. - if err := w.Close(); err != nil { - return fmt.Errorf("close old WAL: %w", err) - } - if err := repl.Close(); err != nil { - return fmt.Errorf("close new WAL: %w", err) - } - if err := fileutil.Replace(tmpdir, dir); err != nil { - return fmt.Errorf("replace old WAL: %w", err) - } - return nil -} diff --git a/tsdb/wal_test.go b/tsdb/wal_test.go deleted file mode 100644 index 7794a54547..0000000000 --- a/tsdb/wal_test.go +++ /dev/null @@ -1,553 +0,0 @@ -// Copyright 2017 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !windows - -package tsdb - -import ( - "encoding/binary" - "io" - "math/rand" - "os" - "path" - "path/filepath" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/stretchr/testify/require" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - "github.com/prometheus/prometheus/tsdb/chunks" - "github.com/prometheus/prometheus/tsdb/record" - "github.com/prometheus/prometheus/tsdb/tombstones" - "github.com/prometheus/prometheus/tsdb/wlog" - "github.com/prometheus/prometheus/util/testutil" -) - -func TestSegmentWAL_cut(t *testing.T) { - tmpdir := t.TempDir() - - // This calls cut() implicitly the first time without a previous tail. - w, err := OpenSegmentWAL(tmpdir, nil, 0, nil) - require.NoError(t, err) - - require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) - - require.NoError(t, w.cut()) - - // Cutting creates a new file. - require.Len(t, w.files, 2) - - require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) - - require.NoError(t, w.Close()) - - for _, of := range w.files { - f, err := os.Open(of.Name()) - require.NoError(t, err) - - // Verify header data. - metab := make([]byte, 8) - _, err = f.Read(metab) - require.NoError(t, err) - require.Equal(t, WALMagic, binary.BigEndian.Uint32(metab[:4])) - require.Equal(t, WALFormatDefault, metab[4]) - - // We cannot actually check for correct pre-allocation as it is - // optional per filesystem and handled transparently. - et, flag, b, err := newWALReader(nil, nil).entry(f) - require.NoError(t, err) - require.Equal(t, WALEntrySeries, et) - require.Equal(t, byte(walSeriesSimple), flag) - require.Equal(t, []byte("Hello World!!"), b) - } -} - -func TestSegmentWAL_Truncate(t *testing.T) { - const ( - numMetrics = 20000 - batch = 100 - ) - series, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), numMetrics) - require.NoError(t, err) - - dir := t.TempDir() - - w, err := OpenSegmentWAL(dir, nil, 0, nil) - require.NoError(t, err) - defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w) - w.segmentSize = 10000 - - for i := 0; i < numMetrics; i += batch { - var rs []record.RefSeries - - for j, s := range series[i : i+batch] { - rs = append(rs, record.RefSeries{Labels: s, Ref: chunks.HeadSeriesRef(i+j) + 1}) - } - err := w.LogSeries(rs) - require.NoError(t, err) - } - - // We mark the 2nd half of the files with a min timestamp that should discard - // them from the selection of compactable files. - for i, f := range w.files[len(w.files)/2:] { - f.maxTime = int64(1000 + i) - } - // All series in those files must be preserved regarding of the provided postings list. - boundarySeries := w.files[len(w.files)/2].minSeries - - // We truncate while keeping every 2nd series. - keep := map[chunks.HeadSeriesRef]struct{}{} - for i := 1; i <= numMetrics; i += 2 { - keep[chunks.HeadSeriesRef(i)] = struct{}{} - } - keepf := func(id chunks.HeadSeriesRef) bool { - _, ok := keep[id] - return ok - } - - err = w.Truncate(1000, keepf) - require.NoError(t, err) - - var expected []record.RefSeries - - for i := 1; i <= numMetrics; i++ { - if i%2 == 1 || chunks.HeadSeriesRef(i) >= boundarySeries { - expected = append(expected, record.RefSeries{Ref: chunks.HeadSeriesRef(i), Labels: series[i-1]}) - } - } - - // Call Truncate once again to see whether we can read the written file without - // creating a new WAL. - err = w.Truncate(1000, keepf) - require.NoError(t, err) - require.NoError(t, w.Close()) - - // The same again with a new WAL. - w, err = OpenSegmentWAL(dir, nil, 0, nil) - require.NoError(t, err) - defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w) - - var readSeries []record.RefSeries - r := w.Reader() - - require.NoError(t, r.Read(func(s []record.RefSeries) { - readSeries = append(readSeries, s...) - }, nil, nil)) - - testutil.RequireEqual(t, expected, readSeries) -} - -// Symmetrical test of reading and writing to the WAL via its main interface. -func TestSegmentWAL_Log_Restore(t *testing.T) { - const ( - numMetrics = 50 - iterations = 5 - stepSize = 5 - ) - // Generate testing data. It does not make semantic sense but - // for the purpose of this test. - series, err := labels.ReadLabels(filepath.Join("testdata", "20kseries.json"), numMetrics) - require.NoError(t, err) - - dir := t.TempDir() - - var ( - recordedSeries [][]record.RefSeries - recordedSamples [][]record.RefSample - recordedDeletes [][]tombstones.Stone - ) - var totalSamples int - - // Open WAL a bunch of times, validate all previous data can be read, - // write more data to it, close it. - for k := 0; k < numMetrics; k += numMetrics / iterations { - w, err := OpenSegmentWAL(dir, nil, 0, nil) - require.NoError(t, err) - - // Set smaller segment size so we can actually write several files. - w.segmentSize = 1000 * 1000 - - r := w.Reader() - - var ( - resultSeries [][]record.RefSeries - resultSamples [][]record.RefSample - resultDeletes [][]tombstones.Stone - ) - - serf := func(series []record.RefSeries) { - if len(series) > 0 { - clsets := make([]record.RefSeries, len(series)) - copy(clsets, series) - resultSeries = append(resultSeries, clsets) - } - } - smplf := func(smpls []record.RefSample) { - if len(smpls) > 0 { - csmpls := make([]record.RefSample, len(smpls)) - copy(csmpls, smpls) - resultSamples = append(resultSamples, csmpls) - } - } - - delf := func(stones []tombstones.Stone) { - if len(stones) > 0 { - cst := make([]tombstones.Stone, len(stones)) - copy(cst, stones) - resultDeletes = append(resultDeletes, cst) - } - } - - require.NoError(t, r.Read(serf, smplf, delf)) - - testutil.RequireEqual(t, recordedSamples, resultSamples) - testutil.RequireEqual(t, recordedSeries, resultSeries) - testutil.RequireEqual(t, recordedDeletes, resultDeletes) - - series := series[k : k+(numMetrics/iterations)] - - // Insert in batches and generate different amounts of samples for each. - for i := 0; i < len(series); i += stepSize { - var samples []record.RefSample - var stones []tombstones.Stone - - for j := 0; j < i*10; j++ { - samples = append(samples, record.RefSample{ - Ref: chunks.HeadSeriesRef(j % 10000), - T: int64(j * 2), - V: rand.Float64(), - }) - } - - for j := 0; j < i*20; j++ { - ts := rand.Int63() - stones = append(stones, tombstones.Stone{Ref: storage.SeriesRef(rand.Uint64()), Intervals: tombstones.Intervals{{Mint: ts, Maxt: ts + rand.Int63n(10000)}}}) - } - - lbls := series[i : i+stepSize] - series := make([]record.RefSeries, 0, len(series)) - for j, l := range lbls { - series = append(series, record.RefSeries{ - Ref: chunks.HeadSeriesRef(i + j), - Labels: l, - }) - } - - require.NoError(t, w.LogSeries(series)) - require.NoError(t, w.LogSamples(samples)) - require.NoError(t, w.LogDeletes(stones)) - - if len(lbls) > 0 { - recordedSeries = append(recordedSeries, series) - } - if len(samples) > 0 { - recordedSamples = append(recordedSamples, samples) - totalSamples += len(samples) - } - if len(stones) > 0 { - recordedDeletes = append(recordedDeletes, stones) - } - } - - require.NoError(t, w.Close()) - } -} - -func TestWALRestoreCorrupted_invalidSegment(t *testing.T) { - dir := t.TempDir() - - wal, err := OpenSegmentWAL(dir, nil, 0, nil) - require.NoError(t, err) - defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(wal) - - _, err = wal.createSegmentFile(filepath.Join(dir, "000000")) - require.NoError(t, err) - f, err := wal.createSegmentFile(filepath.Join(dir, "000001")) - require.NoError(t, err) - f2, err := wal.createSegmentFile(filepath.Join(dir, "000002")) - require.NoError(t, err) - require.NoError(t, f2.Close()) - - // Make header of second segment invalid. - _, err = f.WriteAt([]byte{1, 2, 3, 4}, 0) - require.NoError(t, err) - require.NoError(t, f.Close()) - - require.NoError(t, wal.Close()) - - wal, err = OpenSegmentWAL(dir, log.NewLogfmtLogger(os.Stderr), 0, nil) - require.NoError(t, err) - defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(wal) - - files, err := os.ReadDir(dir) - require.NoError(t, err) - fns := []string{} - for _, f := range files { - fns = append(fns, f.Name()) - } - require.Equal(t, []string{"000000"}, fns) -} - -// Test reading from a WAL that has been corrupted through various means. -func TestWALRestoreCorrupted(t *testing.T) { - cases := []struct { - name string - f func(*testing.T, *SegmentWAL) - }{ - { - name: "truncate_checksum", - f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666) - require.NoError(t, err) - defer f.Close() - - off, err := f.Seek(0, io.SeekEnd) - require.NoError(t, err) - - require.NoError(t, f.Truncate(off-1)) - }, - }, - { - name: "truncate_body", - f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666) - require.NoError(t, err) - defer f.Close() - - off, err := f.Seek(0, io.SeekEnd) - require.NoError(t, err) - - require.NoError(t, f.Truncate(off-8)) - }, - }, - { - name: "body_content", - f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666) - require.NoError(t, err) - defer f.Close() - - off, err := f.Seek(0, io.SeekEnd) - require.NoError(t, err) - - // Write junk before checksum starts. - _, err = f.WriteAt([]byte{1, 2, 3, 4}, off-8) - require.NoError(t, err) - }, - }, - { - name: "checksum", - f: func(t *testing.T, w *SegmentWAL) { - f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0o666) - require.NoError(t, err) - defer f.Close() - - off, err := f.Seek(0, io.SeekEnd) - require.NoError(t, err) - - // Write junk into checksum - _, err = f.WriteAt([]byte{1, 2, 3, 4}, off-4) - require.NoError(t, err) - }, - }, - } - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - // Generate testing data. It does not make semantic sense but - // for the purpose of this test. - dir := t.TempDir() - - w, err := OpenSegmentWAL(dir, nil, 0, nil) - require.NoError(t, err) - defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w) - - require.NoError(t, w.LogSamples([]record.RefSample{{T: 1, V: 2}})) - require.NoError(t, w.LogSamples([]record.RefSample{{T: 2, V: 3}})) - - require.NoError(t, w.cut()) - - // Sleep 2 seconds to avoid error where cut and test "cases" function may write or - // truncate the file out of orders as "cases" are not synchronized with cut. - // Hopefully cut will complete by 2 seconds. - time.Sleep(2 * time.Second) - - require.NoError(t, w.LogSamples([]record.RefSample{{T: 3, V: 4}})) - require.NoError(t, w.LogSamples([]record.RefSample{{T: 5, V: 6}})) - - require.NoError(t, w.Close()) - - // cut() truncates and fsyncs the first segment async. If it happens after - // the corruption we apply below, the corruption will be overwritten again. - // Fire and forget a sync to avoid flakiness. - w.files[0].Sync() - // Corrupt the second entry in the first file. - // After re-opening we must be able to read the first entry - // and the rest, including the second file, must be truncated for clean further - // writes. - c.f(t, w) - - logger := log.NewLogfmtLogger(os.Stderr) - - w2, err := OpenSegmentWAL(dir, logger, 0, nil) - require.NoError(t, err) - defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w2) - - r := w2.Reader() - - serf := func(l []record.RefSeries) { - require.Empty(t, l) - } - - // Weird hack to check order of reads. - i := 0 - samplef := func(s []record.RefSample) { - if i == 0 { - require.Equal(t, []record.RefSample{{T: 1, V: 2}}, s) - i++ - } else { - require.Equal(t, []record.RefSample{{T: 99, V: 100}}, s) - } - } - - require.NoError(t, r.Read(serf, samplef, nil)) - - require.NoError(t, w2.LogSamples([]record.RefSample{{T: 99, V: 100}})) - require.NoError(t, w2.Close()) - - // We should see the first valid entry and the new one, everything after - // is truncated. - w3, err := OpenSegmentWAL(dir, logger, 0, nil) - require.NoError(t, err) - defer func(wal *SegmentWAL) { require.NoError(t, wal.Close()) }(w3) - - r = w3.Reader() - - i = 0 - require.NoError(t, r.Read(serf, samplef, nil)) - }) - } -} - -func TestMigrateWAL_Empty(t *testing.T) { - // The migration procedure must properly deal with a zero-length segment, - // which is valid in the new format. - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - - // Initialize empty WAL. - w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone) - require.NoError(t, err) - require.NoError(t, w.Close()) - - require.NoError(t, MigrateWAL(nil, wdir)) -} - -func TestMigrateWAL_Fuzz(t *testing.T) { - dir := t.TempDir() - - wdir := path.Join(dir, "wal") - - // Should pass if no WAL exists yet. - require.NoError(t, MigrateWAL(nil, wdir)) - - oldWAL, err := OpenSegmentWAL(wdir, nil, time.Minute, nil) - require.NoError(t, err) - - // Write some data. - require.NoError(t, oldWAL.LogSeries([]record.RefSeries{ - {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, - {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, - })) - require.NoError(t, oldWAL.LogSamples([]record.RefSample{ - {Ref: 1, T: 100, V: 200}, - {Ref: 2, T: 300, V: 400}, - })) - require.NoError(t, oldWAL.LogSeries([]record.RefSeries{ - {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, - })) - require.NoError(t, oldWAL.LogSamples([]record.RefSample{ - {Ref: 3, T: 100, V: 200}, - {Ref: 4, T: 300, V: 400}, - })) - require.NoError(t, oldWAL.LogDeletes([]tombstones.Stone{ - {Ref: 1, Intervals: []tombstones.Interval{{Mint: 100, Maxt: 200}}}, - })) - - require.NoError(t, oldWAL.Close()) - - // Perform migration. - require.NoError(t, MigrateWAL(nil, wdir)) - - w, err := wlog.New(nil, nil, wdir, wlog.CompressionNone) - require.NoError(t, err) - - // We can properly write some new data after migration. - var enc record.Encoder - require.NoError(t, w.Log(enc.Samples([]record.RefSample{ - {Ref: 500, T: 1, V: 1}, - }, nil))) - - require.NoError(t, w.Close()) - - // Read back all data. - sr, err := wlog.NewSegmentsReader(wdir) - require.NoError(t, err) - - r := wlog.NewReader(sr) - var res []interface{} - dec := record.NewDecoder(labels.NewSymbolTable()) - - for r.Next() { - rec := r.Record() - - switch dec.Type(rec) { - case record.Series: - s, err := dec.Series(rec, nil) - require.NoError(t, err) - res = append(res, s) - case record.Samples: - s, err := dec.Samples(rec, nil) - require.NoError(t, err) - res = append(res, s) - case record.Tombstones: - s, err := dec.Tombstones(rec, nil) - require.NoError(t, err) - res = append(res, s) - default: - require.Fail(t, "unknown record type %d", dec.Type(rec)) - } - } - require.NoError(t, r.Err()) - - testutil.RequireEqual(t, []interface{}{ - []record.RefSeries{ - {Ref: 100, Labels: labels.FromStrings("abc", "def", "123", "456")}, - {Ref: 1, Labels: labels.FromStrings("abc", "def2", "1234", "4567")}, - }, - []record.RefSample{{Ref: 1, T: 100, V: 200}, {Ref: 2, T: 300, V: 400}}, - []record.RefSeries{ - {Ref: 200, Labels: labels.FromStrings("xyz", "def", "foo", "bar")}, - }, - []record.RefSample{{Ref: 3, T: 100, V: 200}, {Ref: 4, T: 300, V: 400}}, - []tombstones.Stone{{Ref: 1, Intervals: []tombstones.Interval{{Mint: 100, Maxt: 200}}}}, - []record.RefSample{{Ref: 500, T: 1, V: 1}}, - }, res) - - // Migrating an already migrated WAL shouldn't do anything. - require.NoError(t, MigrateWAL(nil, wdir)) -} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 48938fce12..b56026e45e 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -177,13 +177,6 @@ type TSDBAdminStats interface { WALReplayStatus() (tsdb.WALReplayStatus, error) } -// QueryEngine defines the interface for the *promql.Engine, so it can be replaced, wrapped or mocked. -type QueryEngine interface { - SetQueryLogger(l promql.QueryLogger) - NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) - NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) -} - type QueryOpts interface { EnablePerStepStats() bool LookbackDelta() time.Duration @@ -193,7 +186,7 @@ type QueryOpts interface { // them using the provided storage and query engine. type API struct { Queryable storage.SampleAndChunkQueryable - QueryEngine QueryEngine + QueryEngine promql.QueryEngine ExemplarQueryable storage.ExemplarQueryable scrapePoolsRetriever func(context.Context) ScrapePoolsRetriever @@ -226,7 +219,7 @@ type API struct { // NewAPI returns an initialized API type. func NewAPI( - qe QueryEngine, + qe promql.QueryEngine, q storage.SampleAndChunkQueryable, ap storage.Appendable, eq storage.ExemplarQueryable, diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 6ccfdb5d7f..63a0225357 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -3881,8 +3881,6 @@ type fakeEngine struct { query fakeQuery } -func (e *fakeEngine) SetQueryLogger(promql.QueryLogger) {} - func (e *fakeEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { return &e.query, nil } diff --git a/web/federate_test.go b/web/federate_test.go index b8749dfa32..16637f60a3 100644 --- a/web/federate_test.go +++ b/web/federate_test.go @@ -48,29 +48,29 @@ var scenarios = map[string]struct { }{ "empty": { params: "", - code: 200, + code: http.StatusOK, body: ``, }, "match nothing": { params: "match[]=does_not_match_anything", - code: 200, + code: http.StatusOK, body: ``, }, "invalid params from the beginning": { params: "match[]=-not-a-valid-metric-name", - code: 400, + code: http.StatusBadRequest, body: `1:1: parse error: unexpected `, }, "invalid params somewhere in the middle": { params: "match[]=not-a-valid-metric-name", - code: 400, + code: http.StatusBadRequest, body: `1:4: parse error: unexpected `, }, "test_metric1": { params: "match[]=test_metric1", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="bar",instance="i"} 10000 6000000 test_metric1{foo="boo",instance="i"} 1 6000000 @@ -78,33 +78,33 @@ test_metric1{foo="boo",instance="i"} 1 6000000 }, "test_metric2": { params: "match[]=test_metric2", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric2 untyped test_metric2{foo="boo",instance="i"} 1 6000000 `, }, "test_metric_without_labels": { params: "match[]=test_metric_without_labels", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric_without_labels untyped test_metric_without_labels{instance=""} 1001 6000000 `, }, "test_stale_metric": { params: "match[]=test_metric_stale", - code: 200, + code: http.StatusOK, body: ``, }, "test_old_metric": { params: "match[]=test_metric_old", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric_old untyped test_metric_old{instance=""} 981 5880000 `, }, "{foo='boo'}": { params: "match[]={foo='boo'}", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="boo",instance="i"} 1 6000000 # TYPE test_metric2 untyped @@ -113,7 +113,7 @@ test_metric2{foo="boo",instance="i"} 1 6000000 }, "two matchers": { params: "match[]=test_metric1&match[]=test_metric2", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="bar",instance="i"} 10000 6000000 test_metric1{foo="boo",instance="i"} 1 6000000 @@ -123,7 +123,7 @@ test_metric2{foo="boo",instance="i"} 1 6000000 }, "two matchers with overlap": { params: "match[]={__name__=~'test_metric1'}&match[]={foo='bar'}", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="bar",instance="i"} 10000 6000000 test_metric1{foo="boo",instance="i"} 1 6000000 @@ -131,7 +131,7 @@ test_metric1{foo="boo",instance="i"} 1 6000000 }, "everything": { params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'. - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="bar",instance="i"} 10000 6000000 test_metric1{foo="boo",instance="i"} 1 6000000 @@ -145,7 +145,7 @@ test_metric_without_labels{instance=""} 1001 6000000 }, "empty label value matches everything that doesn't have that label": { params: "match[]={foo='',__name__=~'.%2b'}", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric_old untyped test_metric_old{instance=""} 981 5880000 # TYPE test_metric_without_labels untyped @@ -154,7 +154,7 @@ test_metric_without_labels{instance=""} 1001 6000000 }, "empty label value for a label that doesn't exist at all, matches everything": { params: "match[]={bar='',__name__=~'.%2b'}", - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="bar",instance="i"} 10000 6000000 test_metric1{foo="boo",instance="i"} 1 6000000 @@ -169,7 +169,7 @@ test_metric_without_labels{instance=""} 1001 6000000 "external labels are added if not already present": { params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'. externalLabels: labels.FromStrings("foo", "baz", "zone", "ie"), - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="bar",instance="i",zone="ie"} 10000 6000000 test_metric1{foo="boo",instance="i",zone="ie"} 1 6000000 @@ -186,7 +186,7 @@ test_metric_without_labels{foo="baz",instance="",zone="ie"} 1001 6000000 // know what it does anyway. params: "match[]={__name__=~'.%2b'}", // '%2b' is an URL-encoded '+'. externalLabels: labels.FromStrings("instance", "baz"), - code: 200, + code: http.StatusOK, body: `# TYPE test_metric1 untyped test_metric1{foo="bar",instance="i"} 10000 6000000 test_metric1{foo="boo",instance="i"} 1 6000000 @@ -390,7 +390,6 @@ func TestFederationWithNativeHistograms(t *testing.T) { require.Equal(t, http.StatusOK, res.Code) body, err := io.ReadAll(res.Body) require.NoError(t, err) - p := textparse.NewProtobufParser(body, false, labels.NewSymbolTable()) var actVec promql.Vector metricFamilies := 0