diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index 16d7977452..d30ef491c5 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -15,158 +15,462 @@ package file import ( "context" + "encoding/json" "io" + "io/ioutil" "os" "path/filepath" + "sort" + "sync" "testing" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/discovery/targetgroup" + "github.com/prometheus/prometheus/util/testutil" ) -const testDir = "fixtures" +const defaultWait = time.Second -func TestFileSD(t *testing.T) { - defer os.Remove(filepath.Join(testDir, "_test_valid.yml")) - defer os.Remove(filepath.Join(testDir, "_test_valid.json")) - defer os.Remove(filepath.Join(testDir, "_test_invalid_nil.json")) - defer os.Remove(filepath.Join(testDir, "_test_invalid_nil.yml")) - testFileSD(t, "valid", ".yml", true) - testFileSD(t, "valid", ".json", true) - testFileSD(t, "invalid_nil", ".json", false) - testFileSD(t, "invalid_nil", ".yml", false) +type testRunner struct { + *testing.T + dir string + ch chan []*targetgroup.Group + done, stopped chan struct{} + cancelSD context.CancelFunc + + mtx sync.Mutex + tgs map[string]*targetgroup.Group + receivedAt time.Time } -func testFileSD(t *testing.T, prefix, ext string, expect bool) { - // As interval refreshing is more of a fallback, we only want to test - // whether file watches work as expected. - var conf SDConfig - conf.Files = []string{filepath.Join(testDir, "_*"+ext)} - conf.RefreshInterval = model.Duration(1 * time.Hour) +func newTestRunner(t *testing.T) *testRunner { + t.Helper() - var ( - fsd = NewDiscovery(&conf, nil) - ch = make(chan []*targetgroup.Group) - ctx, cancel = context.WithCancel(context.Background()) - ) - go fsd.Run(ctx, ch) + tmpDir, err := ioutil.TempDir("", "prometheus-file-sd") + testutil.Ok(t, err) - select { - case <-time.After(25 * time.Millisecond): - // Expected. - case tgs := <-ch: - t.Fatalf("Unexpected target groups in file discovery: %s", tgs) + return &testRunner{ + T: t, + dir: tmpDir, + ch: make(chan []*targetgroup.Group), + done: make(chan struct{}), + stopped: make(chan struct{}), + tgs: make(map[string]*targetgroup.Group), } +} - // To avoid empty group struct sent from the discovery caused by invalid fsnotify updates, - // drain the channel until we are ready with the test files. - fileReady := make(chan struct{}) - drainReady := make(chan struct{}) +// copyFile atomically copies a file to the runner's directory. +func (t *testRunner) copyFile(src string) string { + t.Helper() + return t.copyFileTo(src, filepath.Base(src)) +} + +// copyFileTo atomically copies a file with a different name to the runner's directory. +func (t *testRunner) copyFileTo(src string, name string) string { + t.Helper() + + newf, err := ioutil.TempFile(t.dir, "") + testutil.Ok(t, err) + + f, err := os.Open(src) + testutil.Ok(t, err) + + _, err = io.Copy(newf, f) + testutil.Ok(t, err) + testutil.Ok(t, f.Close()) + + dst := filepath.Join(t.dir, name) + err = os.Rename(newf.Name(), dst) + testutil.Ok(t, err) + + return dst +} + +// writeString writes atomically a string to a file. +func (t *testRunner) writeString(file string, data string) { + t.Helper() + + newf, err := ioutil.TempFile(t.dir, "") + testutil.Ok(t, err) + + _, err = newf.WriteString(data) + testutil.Ok(t, err) + testutil.Ok(t, newf.Close()) + + err = os.Rename(newf.Name(), file) + testutil.Ok(t, err) +} + +// appendString appends a string to a file. +func (t *testRunner) appendString(file, data string) { + t.Helper() + + f, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND, 0) + testutil.Ok(t, err) + defer f.Close() + + _, err = f.WriteString(data) + testutil.Ok(t, err) +} + +// run starts the file SD and the loop receiving target groups updates. +func (t *testRunner) run(files ...string) { go func() { + defer close(t.stopped) for { select { - case <-ch: - case <-fileReady: - close(drainReady) + case <-t.done: + os.RemoveAll(t.dir) return + case tgs := <-t.ch: + t.mtx.Lock() + t.receivedAt = time.Now() + for _, tg := range tgs { + t.tgs[tg.Source] = tg + } + t.mtx.Unlock() } } }() - newf, err := os.Create(filepath.Join(testDir, "_test_"+prefix+ext)) - if err != nil { - t.Fatal(err) + for i := range files { + files[i] = filepath.Join(t.dir, files[i]) } - defer newf.Close() + ctx, cancel := context.WithCancel(context.Background()) + t.cancelSD = cancel + go func() { + NewDiscovery( + &SDConfig{ + Files: files, + // Setting a high refresh interval to make sure that the tests only + // rely on file watches. + RefreshInterval: model.Duration(1 * time.Hour), + }, + nil, + ).Run(ctx, t.ch) + }() +} - f, err := os.Open(filepath.Join(testDir, prefix+ext)) - if err != nil { - t.Fatal(err) +func (t *testRunner) stop() { + t.cancelSD() + close(t.done) + <-t.stopped +} + +func (t *testRunner) lastReceive() time.Time { + t.mtx.Lock() + defer t.mtx.Unlock() + return t.receivedAt +} + +func (t *testRunner) targets() []*targetgroup.Group { + t.mtx.Lock() + defer t.mtx.Unlock() + var ( + keys []string + tgs []*targetgroup.Group + ) + + for k := range t.tgs { + keys = append(keys, k) } - defer f.Close() - _, err = io.Copy(newf, f) - if err != nil { - t.Fatal(err) + sort.Strings(keys) + for _, k := range keys { + tgs = append(tgs, t.tgs[k]) } + return tgs +} - // Test file is ready so stop draining the discovery channel. - // It contains two target groups. - close(fileReady) - <-drainReady - newf.WriteString(" ") // One last meaningless write to trigger fsnotify and a new loop of the discovery service. +func (t *testRunner) requireUpdate(ref time.Time, expected []*targetgroup.Group) { + t.Helper() - timeout := time.After(15 * time.Second) -retry: for { select { - case <-timeout: - if expect { - t.Fatalf("Expected new target group but got none") - } else { - // Invalid type fsd should always break down. - break retry - } - case tgs := <-ch: - if !expect { - t.Fatalf("Unexpected target groups %s, we expected a failure here.", tgs) + case <-time.After(defaultWait): + t.Fatalf("Expected update but got none") + return + case <-time.After(defaultWait / 10): + if ref.Equal(t.lastReceive()) { + // No update received. + break } - if len(tgs) != 2 { - continue retry // Potentially a partial write, just retry. + // We can receive partial updates so only check the result when the + // expected number of groups is reached. + tgs := t.targets() + if len(tgs) != len(expected) { + t.Logf("skipping update: expected %d targets, got %d", len(expected), len(tgs)) + break } - tg := tgs[0] - - if _, ok := tg.Labels["foo"]; !ok { - t.Fatalf("Label not parsed") + t.requireTargetGroups(expected, tgs) + if ref.After(time.Time{}) { + t.Logf("update received after %v", t.lastReceive().Sub(ref)) } - if tg.String() != filepath.Join(testDir, "_test_"+prefix+ext+":0") { - t.Fatalf("Unexpected target group %s", tg) - } - - tg = tgs[1] - if tg.String() != filepath.Join(testDir, "_test_"+prefix+ext+":1") { - t.Fatalf("Unexpected target groups %s", tg) - } - break retry + return } } - - // Based on unknown circumstances, sometimes fsnotify will trigger more events in - // some runs (which might be empty, chains of different operations etc.). - // We have to drain those (as the target manager would) to avoid deadlocking and must - // not try to make sense of it all... - drained := make(chan struct{}) - go func() { - for { - select { - case tgs := <-ch: - // Below we will change the file to a bad syntax. Previously extracted target - // groups must not be deleted via sending an empty target group. - if len(tgs[0].Targets) == 0 { - t.Errorf("Unexpected empty target groups received: %s", tgs) - } - case <-time.After(500 * time.Millisecond): - close(drained) - return - } - } - }() - - newf, err = os.Create(filepath.Join(testDir, "_test.new")) - if err != nil { - t.Fatal(err) - } - defer os.Remove(newf.Name()) - - if _, err := newf.Write([]byte("]gibberish\n][")); err != nil { - t.Fatal(err) - } - newf.Close() - - os.Rename(newf.Name(), filepath.Join(testDir, "_test_"+prefix+ext)) - - cancel() - <-drained +} + +func (t *testRunner) requireTargetGroups(expected, got []*targetgroup.Group) { + t.Helper() + b1, err := json.Marshal(expected) + if err != nil { + panic(err) + } + b2, err := json.Marshal(got) + if err != nil { + panic(err) + } + + testutil.Equals(t, string(b1), string(b2)) +} + +// validTg() maps to fixtures/valid.{json,yml}. +func validTg(file string) []*targetgroup.Group { + return []*targetgroup.Group{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("localhost:9090"), + }, + { + model.AddressLabel: model.LabelValue("example.org:443"), + }, + }, + Labels: model.LabelSet{ + model.LabelName("foo"): model.LabelValue("bar"), + fileSDFilepathLabel: model.LabelValue(file), + }, + Source: fileSource(file, 0), + }, + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("my.domain"), + }, + }, + Labels: model.LabelSet{ + fileSDFilepathLabel: model.LabelValue(file), + }, + Source: fileSource(file, 1), + }, + } +} + +// valid2Tg() maps to fixtures/valid2.{json,yml}. +func valid2Tg(file string) []*targetgroup.Group { + return []*targetgroup.Group{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("my.domain"), + }, + }, + Labels: model.LabelSet{ + fileSDFilepathLabel: model.LabelValue(file), + }, + Source: fileSource(file, 0), + }, + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("localhost:9090"), + }, + }, + Labels: model.LabelSet{ + model.LabelName("foo"): model.LabelValue("bar"), + model.LabelName("fred"): model.LabelValue("baz"), + fileSDFilepathLabel: model.LabelValue(file), + }, + Source: fileSource(file, 1), + }, + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("example.org:443"), + }, + }, + Labels: model.LabelSet{ + model.LabelName("scheme"): model.LabelValue("https"), + fileSDFilepathLabel: model.LabelValue(file), + }, + Source: fileSource(file, 2), + }, + } +} + +func TestInitialUpdate(t *testing.T) { + for _, tc := range []string{ + "fixtures/valid.yml", + "fixtures/valid.json", + } { + t.Run(tc, func(t *testing.T) { + t.Parallel() + + runner := newTestRunner(t) + sdFile := runner.copyFile(tc) + + runner.run("*" + filepath.Ext(tc)) + defer runner.stop() + + // Verify that we receive the initial target groups. + runner.requireUpdate(time.Time{}, validTg(sdFile)) + }) + } +} + +func TestInvalidFile(t *testing.T) { + for _, tc := range []string{ + "fixtures/invalid_nil.yml", + "fixtures/invalid_nil.json", + } { + tc := tc + t.Run(tc, func(t *testing.T) { + t.Parallel() + + now := time.Now() + runner := newTestRunner(t) + runner.copyFile(tc) + + runner.run("*" + filepath.Ext(tc)) + defer runner.stop() + + // Verify that we've received nothing. + time.Sleep(defaultWait) + if runner.lastReceive().After(now) { + t.Fatalf("unexpected targets received: %v", runner.targets()) + } + }) + } +} + +func TestNoopFileUpdate(t *testing.T) { + t.Parallel() + + runner := newTestRunner(t) + sdFile := runner.copyFile("fixtures/valid.yml") + + runner.run("*.yml") + defer runner.stop() + + // Verify that we receive the initial target groups. + runner.requireUpdate(time.Time{}, validTg(sdFile)) + + // Verify that we receive an update with the same target groups. + ref := runner.lastReceive() + runner.copyFileTo("fixtures/valid3.yml", "valid.yml") + runner.requireUpdate(ref, validTg(sdFile)) +} + +func TestFileUpdate(t *testing.T) { + t.Parallel() + + runner := newTestRunner(t) + sdFile := runner.copyFile("fixtures/valid.yml") + + runner.run("*.yml") + defer runner.stop() + + // Verify that we receive the initial target groups. + runner.requireUpdate(time.Time{}, validTg(sdFile)) + + // Verify that we receive an update with the new target groups. + ref := runner.lastReceive() + runner.copyFileTo("fixtures/valid2.yml", "valid.yml") + runner.requireUpdate(ref, valid2Tg(sdFile)) +} + +func TestInvalidFileUpdate(t *testing.T) { + t.Parallel() + + runner := newTestRunner(t) + sdFile := runner.copyFile("fixtures/valid.yml") + + runner.run("*.yml") + defer runner.stop() + + // Verify that we receive the initial target groups. + runner.requireUpdate(time.Time{}, validTg(sdFile)) + + ref := runner.lastReceive() + runner.writeString(sdFile, "]gibberish\n][") + + // Verify that we receive nothing or the same targets as before. + time.Sleep(defaultWait) + if runner.lastReceive().After(ref) { + runner.requireTargetGroups(validTg(sdFile), runner.targets()) + } +} + +func TestUpdateFileWithPartialWrites(t *testing.T) { + t.Parallel() + + runner := newTestRunner(t) + sdFile := runner.copyFile("fixtures/valid.yml") + + runner.run("*.yml") + defer runner.stop() + + // Verify that we receive the initial target groups. + runner.requireUpdate(time.Time{}, validTg(sdFile)) + + // Do a partial write operation. + ref := runner.lastReceive() + runner.writeString(sdFile, "- targets") + time.Sleep(defaultWait) + // Verify that we receive nothing or the same target groups as before. + if runner.lastReceive().After(ref) { + runner.requireTargetGroups(validTg(sdFile), runner.targets()) + } + + // Verify that we receive the update target groups once the file is a valid YAML payload. + ref = runner.lastReceive() + runner.appendString(sdFile, `: ["localhost:9091"]`) + runner.requireUpdate(ref, + []*targetgroup.Group{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + { + model.AddressLabel: model.LabelValue("localhost:9091"), + }, + }, + Labels: model.LabelSet{ + fileSDFilepathLabel: model.LabelValue(sdFile), + }, + Source: fileSource(sdFile, 0), + }, + &targetgroup.Group{ + Source: fileSource(sdFile, 1), + }, + }, + ) +} + +func TestRemoveFile(t *testing.T) { + t.Parallel() + + runner := newTestRunner(t) + sdFile := runner.copyFile("fixtures/valid.yml") + + runner.run("*.yml") + defer runner.stop() + + // Verify that we receive the initial target groups. + runner.requireUpdate(time.Time{}, validTg(sdFile)) + + // Verify that we receive the update about the target groups being removed. + ref := runner.lastReceive() + testutil.Ok(t, os.Remove(sdFile)) + runner.requireUpdate( + ref, + []*targetgroup.Group{ + &targetgroup.Group{ + Source: fileSource(sdFile, 0), + }, + &targetgroup.Group{ + Source: fileSource(sdFile, 1), + }}, + ) } diff --git a/discovery/file/fixtures/valid2.yml b/discovery/file/fixtures/valid2.yml new file mode 100644 index 0000000000..428f606544 --- /dev/null +++ b/discovery/file/fixtures/valid2.yml @@ -0,0 +1,8 @@ +- targets: ['my.domain'] +- targets: ['localhost:9090'] + labels: + foo: bar + fred: baz +- targets: ['example.org:443'] + labels: + scheme: https diff --git a/discovery/file/fixtures/valid3.yml b/discovery/file/fixtures/valid3.yml new file mode 100644 index 0000000000..c4f5214805 --- /dev/null +++ b/discovery/file/fixtures/valid3.yml @@ -0,0 +1,7 @@ +# the YAML structure is identical to valid.yml but the raw data is different. +- targets: ['localhost:9090', 'example.org:443'] + labels: + foo: bar + +- targets: ['my.domain'] +