mirror of
https://github.com/prometheus/prometheus.git
synced 2026-04-02 04:11:01 +02:00
PR #17269 replaced atomic os.Rename-based file writes with os.WriteFile to fix a Windows flake. However, os.WriteFile is not atomic (it truncates then writes), and fsnotify can fire between the truncate and write, causing the watcher to read an empty file and replace valid targets with empty ones. Restore atomicity by writing to a temporary file and renaming. On Windows, retry the rename with a short backoff to handle transient "Access is denied" errors when the file watcher or readFile holds an open handle to the destination. Fixes #18237 Signed-off-by: Munem Hashmi <munem.hashmi@gmail.com>
505 lines
12 KiB
Go
505 lines
12 KiB
Go
// Copyright The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package file
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/common/model"
|
|
"github.com/stretchr/testify/require"
|
|
"go.uber.org/goleak"
|
|
|
|
"github.com/prometheus/prometheus/discovery"
|
|
"github.com/prometheus/prometheus/discovery/targetgroup"
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
goleak.VerifyTestMain(m)
|
|
}
|
|
|
|
const defaultWait = time.Second
|
|
|
|
type testRunner struct {
|
|
*testing.T
|
|
dir string
|
|
ch chan []*targetgroup.Group
|
|
done, stopped chan struct{}
|
|
cancelSD context.CancelFunc
|
|
|
|
mtx sync.Mutex
|
|
tgs map[string]*targetgroup.Group
|
|
receivedAt time.Time
|
|
}
|
|
|
|
func newTestRunner(t *testing.T) *testRunner {
|
|
t.Helper()
|
|
|
|
return &testRunner{
|
|
T: t,
|
|
dir: t.TempDir(),
|
|
ch: make(chan []*targetgroup.Group),
|
|
done: make(chan struct{}),
|
|
stopped: make(chan struct{}),
|
|
tgs: make(map[string]*targetgroup.Group),
|
|
}
|
|
}
|
|
|
|
// copyFile copies a file to the runner's directory.
|
|
func (t *testRunner) copyFile(src string) string {
|
|
t.Helper()
|
|
return t.copyFileTo(src, filepath.Base(src))
|
|
}
|
|
|
|
// copyFileTo atomically copies a file with a different name to the runner's directory.
|
|
func (t *testRunner) copyFileTo(src, name string) string {
|
|
t.Helper()
|
|
|
|
content, err := os.ReadFile(src)
|
|
require.NoError(t, err)
|
|
|
|
dst := filepath.Join(t.dir, name)
|
|
t.atomicWrite(dst, content)
|
|
|
|
return dst
|
|
}
|
|
|
|
// writeString atomically writes a string to a file.
|
|
func (t *testRunner) writeString(file, data string) {
|
|
t.Helper()
|
|
t.atomicWrite(file, []byte(data))
|
|
}
|
|
|
|
// atomicWrite writes data to dst atomically by writing to a temporary file
|
|
// and renaming it. The temp file is created outside the watched directory to
|
|
// avoid triggering spurious fsnotify events that could cause readFile to hold
|
|
// an open handle on dst (which would make os.Rename fail on Windows).
|
|
func (t *testRunner) atomicWrite(dst string, data []byte) {
|
|
t.Helper()
|
|
|
|
// Create the temp file via t.TempDir() rather than in t.dir (the watched
|
|
// directory). t.TempDir() returns a fresh directory on the same filesystem
|
|
// as t.dir, so os.Rename works, and cleanup is handled by the test framework.
|
|
tmp, err := os.CreateTemp(t.TempDir(), ".sd-test-*")
|
|
require.NoError(t, err)
|
|
|
|
_, err = tmp.Write(data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, tmp.Close())
|
|
|
|
// On Windows, os.Rename fails if another process holds an open handle
|
|
// on dst. This can happen if a previous refresh cycle's readFile call
|
|
// hasn't released the file yet. Retry a few times to handle this;
|
|
// on Linux/macOS os.Rename always succeeds regardless, so the retry
|
|
// never triggers.
|
|
for retries := 0; ; retries++ {
|
|
err = os.Rename(tmp.Name(), dst)
|
|
if err == nil || retries >= 5 {
|
|
break
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// appendString appends a string to a file.
|
|
func (t *testRunner) appendString(file, data string) {
|
|
t.Helper()
|
|
|
|
f, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND, 0)
|
|
require.NoError(t, err)
|
|
defer f.Close()
|
|
|
|
_, err = f.WriteString(data)
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
// run starts the file SD and the loop receiving target groups updates.
|
|
func (t *testRunner) run(files ...string) {
|
|
go func() {
|
|
defer close(t.stopped)
|
|
for {
|
|
select {
|
|
case <-t.done:
|
|
os.RemoveAll(t.dir)
|
|
return
|
|
case tgs := <-t.ch:
|
|
t.mtx.Lock()
|
|
t.receivedAt = time.Now()
|
|
for _, tg := range tgs {
|
|
t.tgs[tg.Source] = tg
|
|
}
|
|
t.mtx.Unlock()
|
|
}
|
|
}
|
|
}()
|
|
|
|
for i := range files {
|
|
files[i] = filepath.Join(t.dir, files[i])
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
t.cancelSD = cancel
|
|
go func() {
|
|
conf := &SDConfig{
|
|
Files: files,
|
|
// Setting a high refresh interval to make sure that the tests only
|
|
// rely on file watches.
|
|
RefreshInterval: model.Duration(1 * time.Hour),
|
|
}
|
|
|
|
reg := prometheus.NewRegistry()
|
|
refreshMetrics := discovery.NewRefreshMetrics(reg)
|
|
metrics := conf.NewDiscovererMetrics(reg, refreshMetrics)
|
|
require.NoError(t, metrics.Register())
|
|
|
|
d, err := NewDiscovery(
|
|
conf,
|
|
nil,
|
|
metrics,
|
|
)
|
|
require.NoError(t, err)
|
|
|
|
d.Run(ctx, t.ch)
|
|
|
|
metrics.Unregister()
|
|
}()
|
|
}
|
|
|
|
func (t *testRunner) stop() {
|
|
t.cancelSD()
|
|
close(t.done)
|
|
<-t.stopped
|
|
}
|
|
|
|
func (t *testRunner) lastReceive() time.Time {
|
|
t.mtx.Lock()
|
|
defer t.mtx.Unlock()
|
|
return t.receivedAt
|
|
}
|
|
|
|
func (t *testRunner) targets() []*targetgroup.Group {
|
|
t.mtx.Lock()
|
|
defer t.mtx.Unlock()
|
|
var (
|
|
keys []string
|
|
tgs []*targetgroup.Group
|
|
)
|
|
|
|
for k := range t.tgs {
|
|
keys = append(keys, k)
|
|
}
|
|
sort.Strings(keys)
|
|
for _, k := range keys {
|
|
tgs = append(tgs, t.tgs[k])
|
|
}
|
|
return tgs
|
|
}
|
|
|
|
func (t *testRunner) requireUpdate(ref time.Time, expected []*targetgroup.Group) {
|
|
t.Helper()
|
|
|
|
timeout := time.After(defaultWait)
|
|
for {
|
|
select {
|
|
case <-timeout:
|
|
t.Fatalf("Expected update but got none")
|
|
case <-time.After(defaultWait / 10):
|
|
if ref.Equal(t.lastReceive()) {
|
|
// No update received.
|
|
break
|
|
}
|
|
|
|
// We can receive partial updates so only check the result when the
|
|
// expected number of groups is reached.
|
|
tgs := t.targets()
|
|
if len(tgs) != len(expected) {
|
|
t.Logf("skipping update: expected %d targets, got %d", len(expected), len(tgs))
|
|
break
|
|
}
|
|
t.requireTargetGroups(expected, tgs)
|
|
if ref.After(time.Time{}) {
|
|
t.Logf("update received after %v", t.lastReceive().Sub(ref))
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *testRunner) requireTargetGroups(expected, got []*targetgroup.Group) {
|
|
t.Helper()
|
|
b1, err := json.Marshal(expected)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
b2, err := json.Marshal(got)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
require.Equal(t, string(b1), string(b2))
|
|
}
|
|
|
|
// validTg() maps to fixtures/valid.{json,yml}.
|
|
func validTg(file string) []*targetgroup.Group {
|
|
return []*targetgroup.Group{
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue("localhost:9090"),
|
|
},
|
|
{
|
|
model.AddressLabel: model.LabelValue("example.org:443"),
|
|
},
|
|
},
|
|
Labels: model.LabelSet{
|
|
model.LabelName("foo"): model.LabelValue("bar"),
|
|
fileSDFilepathLabel: model.LabelValue(file),
|
|
},
|
|
Source: fileSource(file, 0),
|
|
},
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue("my.domain"),
|
|
},
|
|
},
|
|
Labels: model.LabelSet{
|
|
fileSDFilepathLabel: model.LabelValue(file),
|
|
},
|
|
Source: fileSource(file, 1),
|
|
},
|
|
}
|
|
}
|
|
|
|
// valid2Tg() maps to fixtures/valid2.{json,yml}.
|
|
func valid2Tg(file string) []*targetgroup.Group {
|
|
return []*targetgroup.Group{
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue("my.domain"),
|
|
},
|
|
},
|
|
Labels: model.LabelSet{
|
|
fileSDFilepathLabel: model.LabelValue(file),
|
|
},
|
|
Source: fileSource(file, 0),
|
|
},
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue("localhost:9090"),
|
|
},
|
|
},
|
|
Labels: model.LabelSet{
|
|
model.LabelName("foo"): model.LabelValue("bar"),
|
|
model.LabelName("fred"): model.LabelValue("baz"),
|
|
fileSDFilepathLabel: model.LabelValue(file),
|
|
},
|
|
Source: fileSource(file, 1),
|
|
},
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue("example.org:443"),
|
|
},
|
|
},
|
|
Labels: model.LabelSet{
|
|
model.LabelName("scheme"): model.LabelValue("https"),
|
|
fileSDFilepathLabel: model.LabelValue(file),
|
|
},
|
|
Source: fileSource(file, 2),
|
|
},
|
|
}
|
|
}
|
|
|
|
func TestInitialUpdate(t *testing.T) {
|
|
for _, tc := range []string{
|
|
"fixtures/valid.yml",
|
|
"fixtures/valid.json",
|
|
} {
|
|
t.Run(tc, func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
runner := newTestRunner(t)
|
|
sdFile := runner.copyFile(tc)
|
|
|
|
runner.run("*" + filepath.Ext(tc))
|
|
defer runner.stop()
|
|
|
|
// Verify that we receive the initial target groups.
|
|
runner.requireUpdate(time.Time{}, validTg(sdFile))
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestInvalidFile(t *testing.T) {
|
|
for _, tc := range []string{
|
|
"fixtures/invalid_nil.yml",
|
|
"fixtures/invalid_nil.json",
|
|
} {
|
|
t.Run(tc, func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
now := time.Now()
|
|
runner := newTestRunner(t)
|
|
runner.copyFile(tc)
|
|
|
|
runner.run("*" + filepath.Ext(tc))
|
|
defer runner.stop()
|
|
|
|
// Verify that we've received nothing.
|
|
time.Sleep(defaultWait)
|
|
require.False(t, runner.lastReceive().After(now), "unexpected targets received: %v", runner.targets())
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestNoopFileUpdate(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
runner := newTestRunner(t)
|
|
sdFile := runner.copyFile("fixtures/valid.yml")
|
|
|
|
runner.run("*.yml")
|
|
defer runner.stop()
|
|
|
|
// Verify that we receive the initial target groups.
|
|
runner.requireUpdate(time.Time{}, validTg(sdFile))
|
|
|
|
// Verify that we receive an update with the same target groups.
|
|
ref := runner.lastReceive()
|
|
runner.copyFileTo("fixtures/valid3.yml", "valid.yml")
|
|
runner.requireUpdate(ref, validTg(sdFile))
|
|
}
|
|
|
|
func TestFileUpdate(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
runner := newTestRunner(t)
|
|
sdFile := runner.copyFile("fixtures/valid.yml")
|
|
|
|
runner.run("*.yml")
|
|
defer runner.stop()
|
|
|
|
// Verify that we receive the initial target groups.
|
|
runner.requireUpdate(time.Time{}, validTg(sdFile))
|
|
|
|
// Verify that we receive an update with the new target groups.
|
|
ref := runner.lastReceive()
|
|
runner.copyFileTo("fixtures/valid2.yml", "valid.yml")
|
|
runner.requireUpdate(ref, valid2Tg(sdFile))
|
|
}
|
|
|
|
func TestInvalidFileUpdate(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
runner := newTestRunner(t)
|
|
sdFile := runner.copyFile("fixtures/valid.yml")
|
|
|
|
runner.run("*.yml")
|
|
defer runner.stop()
|
|
|
|
// Verify that we receive the initial target groups.
|
|
runner.requireUpdate(time.Time{}, validTg(sdFile))
|
|
|
|
ref := runner.lastReceive()
|
|
runner.writeString(sdFile, "]gibberish\n][")
|
|
|
|
// Verify that we receive nothing or the same targets as before.
|
|
time.Sleep(defaultWait)
|
|
if runner.lastReceive().After(ref) {
|
|
runner.requireTargetGroups(validTg(sdFile), runner.targets())
|
|
}
|
|
}
|
|
|
|
func TestUpdateFileWithPartialWrites(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
runner := newTestRunner(t)
|
|
sdFile := runner.copyFile("fixtures/valid.yml")
|
|
|
|
runner.run("*.yml")
|
|
defer runner.stop()
|
|
|
|
// Verify that we receive the initial target groups.
|
|
runner.requireUpdate(time.Time{}, validTg(sdFile))
|
|
|
|
// Do a partial write operation.
|
|
ref := runner.lastReceive()
|
|
runner.writeString(sdFile, "- targets")
|
|
time.Sleep(defaultWait)
|
|
// Verify that we receive nothing or the same target groups as before.
|
|
if runner.lastReceive().After(ref) {
|
|
runner.requireTargetGroups(validTg(sdFile), runner.targets())
|
|
}
|
|
|
|
// Verify that we receive the update target groups once the file is a valid YAML payload.
|
|
ref = runner.lastReceive()
|
|
runner.appendString(sdFile, `: ["localhost:9091"]`)
|
|
runner.requireUpdate(ref,
|
|
[]*targetgroup.Group{
|
|
{
|
|
Targets: []model.LabelSet{
|
|
{
|
|
model.AddressLabel: model.LabelValue("localhost:9091"),
|
|
},
|
|
},
|
|
Labels: model.LabelSet{
|
|
fileSDFilepathLabel: model.LabelValue(sdFile),
|
|
},
|
|
Source: fileSource(sdFile, 0),
|
|
},
|
|
{
|
|
Source: fileSource(sdFile, 1),
|
|
},
|
|
},
|
|
)
|
|
}
|
|
|
|
func TestRemoveFile(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
runner := newTestRunner(t)
|
|
sdFile := runner.copyFile("fixtures/valid.yml")
|
|
|
|
runner.run("*.yml")
|
|
defer runner.stop()
|
|
|
|
// Verify that we receive the initial target groups.
|
|
runner.requireUpdate(time.Time{}, validTg(sdFile))
|
|
|
|
// Verify that we receive the update about the target groups being removed.
|
|
ref := runner.lastReceive()
|
|
require.NoError(t, os.Remove(sdFile))
|
|
runner.requireUpdate(
|
|
ref,
|
|
[]*targetgroup.Group{
|
|
{
|
|
Source: fileSource(sdFile, 0),
|
|
},
|
|
{
|
|
Source: fileSource(sdFile, 1),
|
|
},
|
|
},
|
|
)
|
|
}
|