prometheus/discovery/file/file_test.go
Munem Hashmi 89b3ad45a8
discovery/file: restore atomic file writes in tests (#18259)
PR #17269 replaced atomic os.Rename-based file writes with
os.WriteFile to fix a Windows flake. However, os.WriteFile is not
atomic (it truncates then writes), and fsnotify can fire between
the truncate and write, causing the watcher to read an empty file
and replace valid targets with empty ones.

Restore atomicity by writing to a temporary file and renaming.
On Windows, retry the rename with a short backoff to handle
transient "Access is denied" errors when the file watcher or
readFile holds an open handle to the destination.

Fixes #18237

Signed-off-by: Munem Hashmi <munem.hashmi@gmail.com>
2026-03-11 08:12:56 +01:00

505 lines
12 KiB
Go

// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package file
import (
"context"
"encoding/json"
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
const defaultWait = time.Second
type testRunner struct {
*testing.T
dir string
ch chan []*targetgroup.Group
done, stopped chan struct{}
cancelSD context.CancelFunc
mtx sync.Mutex
tgs map[string]*targetgroup.Group
receivedAt time.Time
}
func newTestRunner(t *testing.T) *testRunner {
t.Helper()
return &testRunner{
T: t,
dir: t.TempDir(),
ch: make(chan []*targetgroup.Group),
done: make(chan struct{}),
stopped: make(chan struct{}),
tgs: make(map[string]*targetgroup.Group),
}
}
// copyFile copies a file to the runner's directory.
func (t *testRunner) copyFile(src string) string {
t.Helper()
return t.copyFileTo(src, filepath.Base(src))
}
// copyFileTo atomically copies a file with a different name to the runner's directory.
func (t *testRunner) copyFileTo(src, name string) string {
t.Helper()
content, err := os.ReadFile(src)
require.NoError(t, err)
dst := filepath.Join(t.dir, name)
t.atomicWrite(dst, content)
return dst
}
// writeString atomically writes a string to a file.
func (t *testRunner) writeString(file, data string) {
t.Helper()
t.atomicWrite(file, []byte(data))
}
// atomicWrite writes data to dst atomically by writing to a temporary file
// and renaming it. The temp file is created outside the watched directory to
// avoid triggering spurious fsnotify events that could cause readFile to hold
// an open handle on dst (which would make os.Rename fail on Windows).
func (t *testRunner) atomicWrite(dst string, data []byte) {
t.Helper()
// Create the temp file via t.TempDir() rather than in t.dir (the watched
// directory). t.TempDir() returns a fresh directory on the same filesystem
// as t.dir, so os.Rename works, and cleanup is handled by the test framework.
tmp, err := os.CreateTemp(t.TempDir(), ".sd-test-*")
require.NoError(t, err)
_, err = tmp.Write(data)
require.NoError(t, err)
require.NoError(t, tmp.Close())
// On Windows, os.Rename fails if another process holds an open handle
// on dst. This can happen if a previous refresh cycle's readFile call
// hasn't released the file yet. Retry a few times to handle this;
// on Linux/macOS os.Rename always succeeds regardless, so the retry
// never triggers.
for retries := 0; ; retries++ {
err = os.Rename(tmp.Name(), dst)
if err == nil || retries >= 5 {
break
}
time.Sleep(50 * time.Millisecond)
}
require.NoError(t, err)
}
// appendString appends a string to a file.
func (t *testRunner) appendString(file, data string) {
t.Helper()
f, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND, 0)
require.NoError(t, err)
defer f.Close()
_, err = f.WriteString(data)
require.NoError(t, err)
}
// run starts the file SD and the loop receiving target groups updates.
func (t *testRunner) run(files ...string) {
go func() {
defer close(t.stopped)
for {
select {
case <-t.done:
os.RemoveAll(t.dir)
return
case tgs := <-t.ch:
t.mtx.Lock()
t.receivedAt = time.Now()
for _, tg := range tgs {
t.tgs[tg.Source] = tg
}
t.mtx.Unlock()
}
}
}()
for i := range files {
files[i] = filepath.Join(t.dir, files[i])
}
ctx, cancel := context.WithCancel(context.Background())
t.cancelSD = cancel
go func() {
conf := &SDConfig{
Files: files,
// Setting a high refresh interval to make sure that the tests only
// rely on file watches.
RefreshInterval: model.Duration(1 * time.Hour),
}
reg := prometheus.NewRegistry()
refreshMetrics := discovery.NewRefreshMetrics(reg)
metrics := conf.NewDiscovererMetrics(reg, refreshMetrics)
require.NoError(t, metrics.Register())
d, err := NewDiscovery(
conf,
nil,
metrics,
)
require.NoError(t, err)
d.Run(ctx, t.ch)
metrics.Unregister()
}()
}
func (t *testRunner) stop() {
t.cancelSD()
close(t.done)
<-t.stopped
}
func (t *testRunner) lastReceive() time.Time {
t.mtx.Lock()
defer t.mtx.Unlock()
return t.receivedAt
}
func (t *testRunner) targets() []*targetgroup.Group {
t.mtx.Lock()
defer t.mtx.Unlock()
var (
keys []string
tgs []*targetgroup.Group
)
for k := range t.tgs {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
tgs = append(tgs, t.tgs[k])
}
return tgs
}
func (t *testRunner) requireUpdate(ref time.Time, expected []*targetgroup.Group) {
t.Helper()
timeout := time.After(defaultWait)
for {
select {
case <-timeout:
t.Fatalf("Expected update but got none")
case <-time.After(defaultWait / 10):
if ref.Equal(t.lastReceive()) {
// No update received.
break
}
// We can receive partial updates so only check the result when the
// expected number of groups is reached.
tgs := t.targets()
if len(tgs) != len(expected) {
t.Logf("skipping update: expected %d targets, got %d", len(expected), len(tgs))
break
}
t.requireTargetGroups(expected, tgs)
if ref.After(time.Time{}) {
t.Logf("update received after %v", t.lastReceive().Sub(ref))
}
return
}
}
}
func (t *testRunner) requireTargetGroups(expected, got []*targetgroup.Group) {
t.Helper()
b1, err := json.Marshal(expected)
if err != nil {
panic(err)
}
b2, err := json.Marshal(got)
if err != nil {
panic(err)
}
require.Equal(t, string(b1), string(b2))
}
// validTg() maps to fixtures/valid.{json,yml}.
func validTg(file string) []*targetgroup.Group {
return []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("localhost:9090"),
},
{
model.AddressLabel: model.LabelValue("example.org:443"),
},
},
Labels: model.LabelSet{
model.LabelName("foo"): model.LabelValue("bar"),
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 0),
},
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("my.domain"),
},
},
Labels: model.LabelSet{
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 1),
},
}
}
// valid2Tg() maps to fixtures/valid2.{json,yml}.
func valid2Tg(file string) []*targetgroup.Group {
return []*targetgroup.Group{
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("my.domain"),
},
},
Labels: model.LabelSet{
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 0),
},
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("localhost:9090"),
},
},
Labels: model.LabelSet{
model.LabelName("foo"): model.LabelValue("bar"),
model.LabelName("fred"): model.LabelValue("baz"),
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 1),
},
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("example.org:443"),
},
},
Labels: model.LabelSet{
model.LabelName("scheme"): model.LabelValue("https"),
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 2),
},
}
}
func TestInitialUpdate(t *testing.T) {
for _, tc := range []string{
"fixtures/valid.yml",
"fixtures/valid.json",
} {
t.Run(tc, func(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile(tc)
runner.run("*" + filepath.Ext(tc))
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
})
}
}
func TestInvalidFile(t *testing.T) {
for _, tc := range []string{
"fixtures/invalid_nil.yml",
"fixtures/invalid_nil.json",
} {
t.Run(tc, func(t *testing.T) {
t.Parallel()
now := time.Now()
runner := newTestRunner(t)
runner.copyFile(tc)
runner.run("*" + filepath.Ext(tc))
defer runner.stop()
// Verify that we've received nothing.
time.Sleep(defaultWait)
require.False(t, runner.lastReceive().After(now), "unexpected targets received: %v", runner.targets())
})
}
}
func TestNoopFileUpdate(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Verify that we receive an update with the same target groups.
ref := runner.lastReceive()
runner.copyFileTo("fixtures/valid3.yml", "valid.yml")
runner.requireUpdate(ref, validTg(sdFile))
}
func TestFileUpdate(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Verify that we receive an update with the new target groups.
ref := runner.lastReceive()
runner.copyFileTo("fixtures/valid2.yml", "valid.yml")
runner.requireUpdate(ref, valid2Tg(sdFile))
}
func TestInvalidFileUpdate(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
ref := runner.lastReceive()
runner.writeString(sdFile, "]gibberish\n][")
// Verify that we receive nothing or the same targets as before.
time.Sleep(defaultWait)
if runner.lastReceive().After(ref) {
runner.requireTargetGroups(validTg(sdFile), runner.targets())
}
}
func TestUpdateFileWithPartialWrites(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Do a partial write operation.
ref := runner.lastReceive()
runner.writeString(sdFile, "- targets")
time.Sleep(defaultWait)
// Verify that we receive nothing or the same target groups as before.
if runner.lastReceive().After(ref) {
runner.requireTargetGroups(validTg(sdFile), runner.targets())
}
// Verify that we receive the update target groups once the file is a valid YAML payload.
ref = runner.lastReceive()
runner.appendString(sdFile, `: ["localhost:9091"]`)
runner.requireUpdate(ref,
[]*targetgroup.Group{
{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("localhost:9091"),
},
},
Labels: model.LabelSet{
fileSDFilepathLabel: model.LabelValue(sdFile),
},
Source: fileSource(sdFile, 0),
},
{
Source: fileSource(sdFile, 1),
},
},
)
}
func TestRemoveFile(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Verify that we receive the update about the target groups being removed.
ref := runner.lastReceive()
require.NoError(t, os.Remove(sdFile))
runner.requireUpdate(
ref,
[]*targetgroup.Group{
{
Source: fileSource(sdFile, 0),
},
{
Source: fileSource(sdFile, 1),
},
},
)
}