diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 53d2fe12cc..49e05e9243 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -117,8 +117,8 @@ func Main() int { } var ( - notifier = notifier.New(&cfg.notifier) - targetManager = retrieval.NewTargetManager(sampleAppender) + notifier = notifier.New(&cfg.notifier, log.Base()) + targetManager = retrieval.NewTargetManager(sampleAppender, log.Base()) queryEngine = promql.NewEngine(queryable, &cfg.queryEngine) ctx, cancelCtx = context.WithCancel(context.Background()) ) diff --git a/discovery/azure/azure.go b/discovery/azure/azure.go index d03257a067..4f96e5a216 100644 --- a/discovery/azure/azure.go +++ b/discovery/azure/azure.go @@ -66,14 +66,16 @@ type Discovery struct { cfg *config.AzureSDConfig interval time.Duration port int + logger log.Logger } // NewDiscovery returns a new AzureDiscovery which periodically refreshes its targets. -func NewDiscovery(cfg *config.AzureSDConfig) *Discovery { +func NewDiscovery(cfg *config.AzureSDConfig, logger log.Logger) *Discovery { return &Discovery{ cfg: cfg, interval: time.Duration(cfg.RefreshInterval), port: cfg.Port, + logger: logger, } } @@ -91,7 +93,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { tg, err := d.refresh() if err != nil { - log.Errorf("unable to refresh during Azure discovery: %s", err) + d.logger.Errorf("unable to refresh during Azure discovery: %s", err) } else { select { case <-ctx.Done(): @@ -141,13 +143,13 @@ type azureResource struct { } // Create a new azureResource object from an ID string. -func newAzureResourceFromID(id string) (azureResource, error) { +func newAzureResourceFromID(id string, logger log.Logger) (azureResource, error) { // Resource IDs have the following format. // /subscriptions/SUBSCRIPTION_ID/resourceGroups/RESOURCE_GROUP/providers/PROVIDER/TYPE/NAME s := strings.Split(id, "/") if len(s) != 9 { err := fmt.Errorf("invalid ID '%s'. Refusing to create azureResource", id) - log.Error(err) + logger.Error(err) return azureResource{}, err } return azureResource{ @@ -185,7 +187,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { } machines = append(machines, *result.Value...) } - log.Debugf("Found %d virtual machines during Azure discovery.", len(machines)) + d.logger.Debugf("Found %d virtual machines during Azure discovery.", len(machines)) // We have the slice of machines. Now turn them into targets. // Doing them in go routines because the network interface calls are slow. @@ -197,7 +199,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { ch := make(chan target, len(machines)) for i, vm := range machines { go func(i int, vm compute.VirtualMachine) { - r, err := newAzureResourceFromID(*vm.ID) + r, err := newAzureResourceFromID(*vm.ID, d.logger) if err != nil { ch <- target{labelSet: nil, err: err} return @@ -219,14 +221,14 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { // Get the IP address information via separate call to the network provider. for _, nic := range *vm.Properties.NetworkProfile.NetworkInterfaces { - r, err := newAzureResourceFromID(*nic.ID) + r, err := newAzureResourceFromID(*nic.ID, d.logger) if err != nil { ch <- target{labelSet: nil, err: err} return } networkInterface, err := client.nic.Get(r.ResourceGroup, r.Name, "") if err != nil { - log.Errorf("Unable to get network interface %s: %s", r.Name, err) + d.logger.Errorf("Unable to get network interface %s: %s", r.Name, err) ch <- target{labelSet: nil, err: err} // Get out of this routine because we cannot continue without a network interface. return @@ -237,7 +239,7 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { // yet support this. On deallocated machines, this value happens to be nil so it // is a cheap and easy way to determine if a machine is allocated or not. if networkInterface.Properties.Primary == nil { - log.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name) + d.logger.Debugf("Virtual machine %s is deallocated. Skipping during Azure SD.", *vm.Name) ch <- target{} return } @@ -272,6 +274,6 @@ func (d *Discovery) refresh() (tg *config.TargetGroup, err error) { } } - log.Debugf("Azure discovery completed.") + d.logger.Debugf("Azure discovery completed.") return tg, nil } diff --git a/discovery/consul/consul.go b/discovery/consul/consul.go index 952a7fa62a..955c6924c4 100644 --- a/discovery/consul/consul.go +++ b/discovery/consul/consul.go @@ -89,10 +89,11 @@ type Discovery struct { clientDatacenter string tagSeparator string watchedServices []string // Set of services which will be discovered. + logger log.Logger } // NewDiscovery returns a new Discovery for the given config. -func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { +func NewDiscovery(conf *config.ConsulSDConfig, logger log.Logger) (*Discovery, error) { tls, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { return nil, err @@ -121,6 +122,7 @@ func NewDiscovery(conf *config.ConsulSDConfig) (*Discovery, error) { tagSeparator: conf.TagSeparator, watchedServices: conf.Services, clientDatacenter: clientConf.Datacenter, + logger: logger, } return cd, nil } @@ -163,7 +165,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { } if err != nil { - log.Errorf("Error refreshing service list: %s", err) + d.logger.Errorf("Error refreshing service list: %s", err) rpcFailuresCount.Inc() time.Sleep(retryInterval) continue @@ -179,7 +181,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { if d.clientDatacenter == "" { info, err := d.client.Agent().Self() if err != nil { - log.Errorf("Error retrieving datacenter name: %s", err) + d.logger.Errorf("Error retrieving datacenter name: %s", err) time.Sleep(retryInterval) continue } @@ -203,6 +205,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { datacenterLabel: model.LabelValue(d.clientDatacenter), }, tagSeparator: d.tagSeparator, + logger: d.logger, } wctx, cancel := context.WithCancel(ctx) @@ -235,6 +238,7 @@ type consulService struct { labels model.LabelSet client *consul.Client tagSeparator string + logger log.Logger } func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetGroup) { @@ -258,7 +262,7 @@ func (srv *consulService) watch(ctx context.Context, ch chan<- []*config.TargetG } if err != nil { - log.Errorf("Error refreshing service %s: %s", srv.name, err) + srv.logger.Errorf("Error refreshing service %s: %s", srv.name, err) rpcFailuresCount.Inc() time.Sleep(retryInterval) continue diff --git a/discovery/consul/consul_test.go b/discovery/consul/consul_test.go index 2040879b90..1e7cfd5292 100644 --- a/discovery/consul/consul_test.go +++ b/discovery/consul/consul_test.go @@ -16,13 +16,14 @@ package consul import ( "testing" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" ) func TestConfiguredService(t *testing.T) { conf := &config.ConsulSDConfig{ Services: []string{"configuredServiceName"}} - consulDiscovery, err := NewDiscovery(conf) + consulDiscovery, err := NewDiscovery(conf, log.Base()) if err != nil { t.Errorf("Unexpected error when initialising discovery %v", err) @@ -37,7 +38,7 @@ func TestConfiguredService(t *testing.T) { func TestNonConfiguredService(t *testing.T) { conf := &config.ConsulSDConfig{} - consulDiscovery, err := NewDiscovery(conf) + consulDiscovery, err := NewDiscovery(conf, log.Base()) if err != nil { t.Errorf("Unexpected error when initialising discovery %v", err) diff --git a/discovery/discovery.go b/discovery/discovery.go index 91d7eb7ab5..43485178a9 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -50,7 +50,7 @@ type TargetProvider interface { } // ProvidersFromConfig returns all TargetProviders configured in cfg. -func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetProvider { +func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig, logger log.Logger) map[string]TargetProvider { providers := map[string]TargetProvider{} app := func(mech string, i int, tp TargetProvider) { @@ -58,59 +58,59 @@ func ProvidersFromConfig(cfg config.ServiceDiscoveryConfig) map[string]TargetPro } for i, c := range cfg.DNSSDConfigs { - app("dns", i, dns.NewDiscovery(c)) + app("dns", i, dns.NewDiscovery(c, logger)) } for i, c := range cfg.FileSDConfigs { - app("file", i, file.NewDiscovery(c)) + app("file", i, file.NewDiscovery(c, logger)) } for i, c := range cfg.ConsulSDConfigs { - k, err := consul.NewDiscovery(c) + k, err := consul.NewDiscovery(c, logger) if err != nil { - log.Errorf("Cannot create Consul discovery: %s", err) + logger.Errorf("Cannot create Consul discovery: %s", err) continue } app("consul", i, k) } for i, c := range cfg.MarathonSDConfigs { - m, err := marathon.NewDiscovery(c) + m, err := marathon.NewDiscovery(c, logger) if err != nil { - log.Errorf("Cannot create Marathon discovery: %s", err) + logger.Errorf("Cannot create Marathon discovery: %s", err) continue } app("marathon", i, m) } for i, c := range cfg.KubernetesSDConfigs { - k, err := kubernetes.New(log.Base(), c) + k, err := kubernetes.New(logger, c) if err != nil { - log.Errorf("Cannot create Kubernetes discovery: %s", err) + logger.Errorf("Cannot create Kubernetes discovery: %s", err) continue } app("kubernetes", i, k) } for i, c := range cfg.ServersetSDConfigs { - app("serverset", i, zookeeper.NewServersetDiscovery(c)) + app("serverset", i, zookeeper.NewServersetDiscovery(c, logger)) } for i, c := range cfg.NerveSDConfigs { - app("nerve", i, zookeeper.NewNerveDiscovery(c)) + app("nerve", i, zookeeper.NewNerveDiscovery(c, logger)) } for i, c := range cfg.EC2SDConfigs { - app("ec2", i, ec2.NewDiscovery(c)) + app("ec2", i, ec2.NewDiscovery(c, logger)) } for i, c := range cfg.GCESDConfigs { - gced, err := gce.NewDiscovery(c) + gced, err := gce.NewDiscovery(c, logger) if err != nil { - log.Errorf("Cannot initialize GCE discovery: %s", err) + logger.Errorf("Cannot initialize GCE discovery: %s", err) continue } app("gce", i, gced) } for i, c := range cfg.AzureSDConfigs { - app("azure", i, azure.NewDiscovery(c)) + app("azure", i, azure.NewDiscovery(c, logger)) } for i, c := range cfg.TritonSDConfigs { - t, err := triton.New(log.With("sd", "triton"), c) + t, err := triton.New(logger.With("sd", "triton"), c) if err != nil { - log.Errorf("Cannot create Triton discovery: %s", err) + logger.Errorf("Cannot create Triton discovery: %s", err) continue } app("triton", i, t) diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index d74b1df5e2..b98191c72f 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -16,6 +16,7 @@ package discovery import ( "testing" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" "golang.org/x/net/context" yaml "gopkg.in/yaml.v2" @@ -53,7 +54,7 @@ static_configs: go ts.Run(ctx) - ts.UpdateProviders(ProvidersFromConfig(*cfg)) + ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base())) <-called verifyPresence(ts.tgroups, "static/0/0", true) @@ -67,7 +68,7 @@ static_configs: t.Fatalf("Unable to load YAML config sTwo: %s", err) } - ts.UpdateProviders(ProvidersFromConfig(*cfg)) + ts.UpdateProviders(ProvidersFromConfig(*cfg, log.Base())) <-called verifyPresence(ts.tgroups, "static/0/0", true) diff --git a/discovery/dns/dns.go b/discovery/dns/dns.go index d5d506dedc..61ab2dd3a3 100644 --- a/discovery/dns/dns.go +++ b/discovery/dns/dns.go @@ -66,10 +66,11 @@ type Discovery struct { interval time.Duration port int qtype uint16 + logger log.Logger } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *config.DNSSDConfig) *Discovery { +func NewDiscovery(conf *config.DNSSDConfig, logger log.Logger) *Discovery { qtype := dns.TypeSRV switch strings.ToUpper(conf.Type) { case "A": @@ -84,6 +85,7 @@ func NewDiscovery(conf *config.DNSSDConfig) *Discovery { interval: time.Duration(conf.RefreshInterval), qtype: qtype, port: conf.Port, + logger: logger, } } @@ -112,7 +114,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr for _, name := range d.names { go func(n string) { if err := d.refresh(ctx, n, ch); err != nil { - log.Errorf("Error refreshing DNS targets: %s", err) + d.logger.Errorf("Error refreshing DNS targets: %s", err) } wg.Done() }(name) @@ -122,7 +124,7 @@ func (d *Discovery) refreshAll(ctx context.Context, ch chan<- []*config.TargetGr } func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*config.TargetGroup) error { - response, err := lookupAll(name, d.qtype) + response, err := lookupAll(name, d.qtype, d.logger) dnsSDLookupsCount.Inc() if err != nil { dnsSDLookupFailuresCount.Inc() @@ -147,7 +149,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi case *dns.AAAA: target = hostPort(addr.AAAA.String(), d.port) default: - log.Warnf("%q is not a valid SRV record", record) + d.logger.Warnf("%q is not a valid SRV record", record) continue } @@ -167,7 +169,7 @@ func (d *Discovery) refresh(ctx context.Context, name string, ch chan<- []*confi return nil } -func lookupAll(name string, qtype uint16) (*dns.Msg, error) { +func lookupAll(name string, qtype uint16, logger log.Logger) (*dns.Msg, error) { conf, err := dns.ClientConfigFromFile(resolvConf) if err != nil { return nil, fmt.Errorf("could not load resolv.conf: %s", err) @@ -181,7 +183,7 @@ func lookupAll(name string, qtype uint16) (*dns.Msg, error) { for _, lname := range conf.NameList(name) { response, err = lookup(lname, qtype, client, servAddr, false) if err != nil { - log. + logger. With("server", server). With("name", name). With("reason", err). diff --git a/discovery/ec2/ec2.go b/discovery/ec2/ec2.go index 948760121f..99cee5be3d 100644 --- a/discovery/ec2/ec2.go +++ b/discovery/ec2/ec2.go @@ -72,10 +72,11 @@ type Discovery struct { interval time.Duration profile string port int + logger log.Logger } // NewDiscovery returns a new EC2Discovery which periodically refreshes its targets. -func NewDiscovery(conf *config.EC2SDConfig) *Discovery { +func NewDiscovery(conf *config.EC2SDConfig, logger log.Logger) *Discovery { creds := credentials.NewStaticCredentials(conf.AccessKey, string(conf.SecretKey), "") if conf.AccessKey == "" && conf.SecretKey == "" { creds = nil @@ -88,6 +89,7 @@ func NewDiscovery(conf *config.EC2SDConfig) *Discovery { profile: conf.Profile, interval: time.Duration(conf.RefreshInterval), port: conf.Port, + logger: logger, } } @@ -99,7 +101,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Get an initial set right away. tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) } else { select { case ch <- []*config.TargetGroup{tg}: @@ -113,7 +115,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case <-ticker.C: tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) continue } diff --git a/discovery/file/file.go b/discovery/file/file.go index 7e4d11eb42..b74c6e5102 100644 --- a/discovery/file/file.go +++ b/discovery/file/file.go @@ -63,13 +63,15 @@ type Discovery struct { // and how many target groups they contained. // This is used to detect deleted target groups. lastRefresh map[string]int + logger log.Logger } // NewDiscovery returns a new file discovery for the given paths. -func NewDiscovery(conf *config.FileSDConfig) *Discovery { +func NewDiscovery(conf *config.FileSDConfig, logger log.Logger) *Discovery { return &Discovery{ paths: conf.Files, interval: time.Duration(conf.RefreshInterval), + logger: logger, } } @@ -79,7 +81,7 @@ func (d *Discovery) listFiles() []string { for _, p := range d.paths { files, err := filepath.Glob(p) if err != nil { - log.Errorf("Error expanding glob %q: %s", p, err) + d.logger.Errorf("Error expanding glob %q: %s", p, err) continue } paths = append(paths, files...) @@ -100,7 +102,7 @@ func (d *Discovery) watchFiles() { p = "./" } if err := d.watcher.Add(p); err != nil { - log.Errorf("Error adding file watch for %q: %s", p, err) + d.logger.Errorf("Error adding file watch for %q: %s", p, err) } } } @@ -111,7 +113,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { watcher, err := fsnotify.NewWatcher() if err != nil { - log.Errorf("Error creating file watcher: %s", err) + d.logger.Errorf("Error creating file watcher: %s", err) return } d.watcher = watcher @@ -149,7 +151,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case err := <-d.watcher.Errors: if err != nil { - log.Errorf("Error on file watch: %s", err) + d.logger.Errorf("Error on file watch: %s", err) } } } @@ -157,7 +159,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // stop shuts down the file watcher. func (d *Discovery) stop() { - log.Debugf("Stopping file discovery for %s...", d.paths) + d.logger.Debugf("Stopping file discovery for %s...", d.paths) done := make(chan struct{}) defer close(done) @@ -175,10 +177,10 @@ func (d *Discovery) stop() { } }() if err := d.watcher.Close(); err != nil { - log.Errorf("Error closing file watcher for %s: %s", d.paths, err) + d.logger.Errorf("Error closing file watcher for %s: %s", d.paths, err) } - log.Debugf("File discovery for %s stopped.", d.paths) + d.logger.Debugf("File discovery for %s stopped.", d.paths) } // refresh reads all files matching the discovery's patterns and sends the respective @@ -194,7 +196,7 @@ func (d *Discovery) refresh(ctx context.Context, ch chan<- []*config.TargetGroup tgroups, err := readFile(p) if err != nil { fileSDReadErrorsCount.Inc() - log.Errorf("Error reading file %q: %s", p, err) + d.logger.Errorf("Error reading file %q: %s", p, err) // Prevent deletion down below. ref[p] = d.lastRefresh[p] continue diff --git a/discovery/file/file_test.go b/discovery/file/file_test.go index fb10fa9dd7..f977e3b228 100644 --- a/discovery/file/file_test.go +++ b/discovery/file/file_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" @@ -41,7 +42,7 @@ func testFileSD(t *testing.T, ext string) { conf.RefreshInterval = model.Duration(1 * time.Hour) var ( - fsd = NewDiscovery(&conf) + fsd = NewDiscovery(&conf, log.Base()) ch = make(chan []*config.TargetGroup) ctx, cancel = context.WithCancel(context.Background()) ) diff --git a/discovery/gce/gce.go b/discovery/gce/gce.go index 44ac349fea..2b6df1b916 100644 --- a/discovery/gce/gce.go +++ b/discovery/gce/gce.go @@ -76,10 +76,11 @@ type Discovery struct { interval time.Duration port int tagSeparator string + logger log.Logger } // NewDiscovery returns a new Discovery which periodically refreshes its targets. -func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) { +func NewDiscovery(conf *config.GCESDConfig, logger log.Logger) (*Discovery, error) { gd := &Discovery{ project: conf.Project, zone: conf.Zone, @@ -87,6 +88,7 @@ func NewDiscovery(conf *config.GCESDConfig) (*Discovery, error) { interval: time.Duration(conf.RefreshInterval), port: conf.Port, tagSeparator: conf.TagSeparator, + logger: logger, } var err error gd.client, err = google.DefaultClient(oauth2.NoContext, compute.ComputeReadonlyScope) @@ -106,7 +108,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { // Get an initial set right away. tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) } else { select { case ch <- []*config.TargetGroup{tg}: @@ -122,7 +124,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case <-ticker.C: tg, err := d.refresh() if err != nil { - log.Error(err) + d.logger.Error(err) continue } select { diff --git a/discovery/marathon/marathon.go b/discovery/marathon/marathon.go index 2c8ddd4734..d1f2b2f6cd 100644 --- a/discovery/marathon/marathon.go +++ b/discovery/marathon/marathon.go @@ -85,10 +85,11 @@ type Discovery struct { lastRefresh map[string]*config.TargetGroup appsClient AppListClient token string + logger log.Logger } // NewDiscovery returns a new Marathon Discovery. -func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { +func NewDiscovery(conf *config.MarathonSDConfig, logger log.Logger) (*Discovery, error) { tls, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { return nil, err @@ -116,6 +117,7 @@ func NewDiscovery(conf *config.MarathonSDConfig) (*Discovery, error) { refreshInterval: time.Duration(conf.RefreshInterval), appsClient: fetchApps, token: token, + logger: logger, }, nil } @@ -128,7 +130,7 @@ func (d *Discovery) Run(ctx context.Context, ch chan<- []*config.TargetGroup) { case <-time.After(d.refreshInterval): err := d.updateServices(ctx, ch) if err != nil { - log.Errorf("Error while updating services: %s", err) + d.logger.Errorf("Error while updating services: %s", err) } } } @@ -167,7 +169,7 @@ func (d *Discovery) updateServices(ctx context.Context, ch chan<- []*config.Targ case <-ctx.Done(): return ctx.Err() case ch <- []*config.TargetGroup{{Source: source}}: - log.Debugf("Removing group for %s", source) + d.logger.Debugf("Removing group for %s", source) } } } diff --git a/discovery/marathon/marathon_test.go b/discovery/marathon/marathon_test.go index 913380ad61..95e420dd88 100644 --- a/discovery/marathon/marathon_test.go +++ b/discovery/marathon/marathon_test.go @@ -19,6 +19,7 @@ import ( "testing" "time" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "golang.org/x/net/context" @@ -32,7 +33,7 @@ var ( ) func testUpdateServices(client AppListClient, ch chan []*config.TargetGroup) error { - md, err := NewDiscovery(&conf) + md, err := NewDiscovery(&conf, log.Base()) if err != nil { return err } @@ -140,7 +141,7 @@ func TestMarathonSDSendGroup(t *testing.T) { func TestMarathonSDRemoveApp(t *testing.T) { var ch = make(chan []*config.TargetGroup, 1) - md, err := NewDiscovery(&conf) + md, err := NewDiscovery(&conf, log.Base()) if err != nil { t.Fatalf("%s", err) } @@ -176,7 +177,7 @@ func TestMarathonSDRunAndStop(t *testing.T) { ch = make(chan []*config.TargetGroup) doneCh = make(chan error) ) - md, err := NewDiscovery(&conf) + md, err := NewDiscovery(&conf, log.Base()) if err != nil { t.Fatalf("%s", err) } diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index 0b8f1943b2..ba6e9d5fa3 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -24,6 +24,7 @@ import ( "github.com/samuel/go-zookeeper/zk" "golang.org/x/net/context" + "github.com/prometheus/common/log" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/util/strutil" "github.com/prometheus/prometheus/util/treecache" @@ -39,17 +40,18 @@ type Discovery struct { updates chan treecache.ZookeeperTreeCacheEvent treeCaches []*treecache.ZookeeperTreeCache - parse func(data []byte, path string) (model.LabelSet, error) + parse func(data []byte, path string) (model.LabelSet, error) + logger log.Logger } // NewNerveDiscovery returns a new Discovery for the given Nerve config. -func NewNerveDiscovery(conf *config.NerveSDConfig) *Discovery { - return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseNerveMember) +func NewNerveDiscovery(conf *config.NerveSDConfig, logger log.Logger) *Discovery { + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseNerveMember) } // NewServersetDiscovery returns a new Discovery for the given serverset config. -func NewServersetDiscovery(conf *config.ServersetSDConfig) *Discovery { - return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, parseServersetMember) +func NewServersetDiscovery(conf *config.ServersetSDConfig, logger log.Logger) *Discovery { + return NewDiscovery(conf.Servers, time.Duration(conf.Timeout), conf.Paths, logger, parseServersetMember) } // NewDiscovery returns a new discovery along Zookeeper parses with @@ -58,6 +60,7 @@ func NewDiscovery( srvs []string, timeout time.Duration, paths []string, + logger log.Logger, pf func(data []byte, path string) (model.LabelSet, error), ) *Discovery { conn, _, err := zk.Connect(srvs, timeout) @@ -71,6 +74,7 @@ func NewDiscovery( updates: updates, sources: map[string]*config.TargetGroup{}, parse: pf, + logger: logger, } for _, path := range paths { sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates)) diff --git a/notifier/notifier.go b/notifier/notifier.go index baa2f456ec..f999feb288 100644 --- a/notifier/notifier.go +++ b/notifier/notifier.go @@ -65,6 +65,7 @@ type Notifier struct { alertmanagers []*alertmanagerSet cancelDiscovery func() + logger log.Logger } // Options are the configurable parameters of a Handler. @@ -156,7 +157,7 @@ func newAlertMetrics(r prometheus.Registerer, queueCap int, queueLen, alertmanag } // New constructs a new Notifier. -func New(o *Options) *Notifier { +func New(o *Options, logger log.Logger) *Notifier { ctx, cancel := context.WithCancel(context.Background()) if o.Do == nil { @@ -169,6 +170,7 @@ func New(o *Options) *Notifier { cancel: cancel, more: make(chan struct{}, 1), opts: o, + logger: logger, } queueLenFunc := func() float64 { return float64(n.queueLen()) } @@ -189,7 +191,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { ctx, cancel := context.WithCancel(n.ctx) for _, cfg := range conf.AlertingConfig.AlertmanagerConfigs { - ams, err := newAlertmanagerSet(cfg) + ams, err := newAlertmanagerSet(cfg, n.logger) if err != nil { return err } @@ -203,7 +205,7 @@ func (n *Notifier) ApplyConfig(conf *config.Config) error { // old ones. for _, ams := range amSets { go ams.ts.Run(ctx) - ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig)) + ams.ts.UpdateProviders(discovery.ProvidersFromConfig(ams.cfg.ServiceDiscoveryConfig, n.logger)) } if n.cancelDiscovery != nil { n.cancelDiscovery() @@ -283,7 +285,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) { if d := len(alerts) - n.opts.QueueCapacity; d > 0 { alerts = alerts[d:] - log.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d) + n.logger.Warnf("Alert batch larger than queue capacity, dropping %d alerts", d) n.metrics.dropped.Add(float64(d)) } @@ -292,7 +294,7 @@ func (n *Notifier) Send(alerts ...*model.Alert) { if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 { n.queue = n.queue[d:] - log.Warnf("Alert notification queue full, dropping %d alerts", d) + n.logger.Warnf("Alert notification queue full, dropping %d alerts", d) n.metrics.dropped.Add(float64(d)) } n.queue = append(n.queue, alerts...) @@ -349,7 +351,7 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool { b, err := json.Marshal(alerts) if err != nil { - log.Errorf("Encoding alerts failed: %s", err) + n.logger.Errorf("Encoding alerts failed: %s", err) return false } @@ -374,7 +376,7 @@ func (n *Notifier) sendAll(alerts ...*model.Alert) bool { u := am.url().String() if err := n.sendOne(ctx, ams.client, u, b); err != nil { - log.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err) + n.logger.With("alertmanager", u).With("count", len(alerts)).Errorf("Error sending alerts: %s", err) n.metrics.errors.WithLabelValues(u).Inc() } else { atomic.AddUint64(&numSuccess, 1) @@ -413,7 +415,7 @@ func (n *Notifier) sendOne(ctx context.Context, c *http.Client, url string, b [] // Stop shuts down the notification handler. func (n *Notifier) Stop() { - log.Info("Stopping notification handler...") + n.logger.Info("Stopping notification handler...") n.cancel() } @@ -443,11 +445,12 @@ type alertmanagerSet struct { metrics *alertMetrics - mtx sync.RWMutex - ams []alertmanager + mtx sync.RWMutex + ams []alertmanager + logger log.Logger } -func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error) { +func newAlertmanagerSet(cfg *config.AlertmanagerConfig, logger log.Logger) (*alertmanagerSet, error) { client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig) if err != nil { return nil, err @@ -455,6 +458,7 @@ func newAlertmanagerSet(cfg *config.AlertmanagerConfig) (*alertmanagerSet, error s := &alertmanagerSet{ client: client, cfg: cfg, + logger: logger, } s.ts = discovery.NewTargetSet(s) @@ -469,7 +473,7 @@ func (s *alertmanagerSet) Sync(tgs []*config.TargetGroup) { for _, tg := range tgs { ams, err := alertmanagerFromGroup(tg, s.cfg) if err != nil { - log.With("err", err).Error("generating discovered Alertmanagers failed") + s.logger.With("err", err).Error("generating discovered Alertmanagers failed") continue } all = append(all, ams...) diff --git a/notifier/notifier_test.go b/notifier/notifier_test.go index 2ff487bad5..8253bb3d00 100644 --- a/notifier/notifier_test.go +++ b/notifier/notifier_test.go @@ -26,6 +26,7 @@ import ( "golang.org/x/net/context" + "github.com/prometheus/common/log" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" ) @@ -63,7 +64,7 @@ func TestPostPath(t *testing.T) { } func TestHandlerNextBatch(t *testing.T) { - h := New(&Options{}) + h := New(&Options{}, log.Base()) for i := range make([]struct{}, 2*maxBatchSize+1) { h.queue = append(h.queue, &model.Alert{ @@ -150,7 +151,7 @@ func TestHandlerSendAll(t *testing.T) { defer server1.Close() defer server2.Close() - h := New(&Options{}) + h := New(&Options{}, log.Base()) h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ @@ -217,7 +218,7 @@ func TestCustomDo(t *testing.T) { Body: ioutil.NopCloser(nil), }, nil }, - }) + }, log.Base()) h.sendOne(context.Background(), nil, testURL, []byte(testBody)) @@ -239,7 +240,7 @@ func TestExternalLabels(t *testing.T) { Replacement: "c", }, }, - }) + }, log.Base()) // This alert should get the external label attached. h.Send(&model.Alert{ @@ -293,7 +294,7 @@ func TestHandlerRelabel(t *testing.T) { Replacement: "renamed", }, }, - }) + }, log.Base()) // This alert should be dropped due to the configuration h.Send(&model.Alert{ @@ -347,7 +348,9 @@ func TestHandlerQueueing(t *testing.T) { h := New(&Options{ QueueCapacity: 3 * maxBatchSize, - }) + }, + log.Base(), + ) h.alertmanagers = append(h.alertmanagers, &alertmanagerSet{ ams: []alertmanager{ alertmanagerMock{ diff --git a/retrieval/targetmanager.go b/retrieval/targetmanager.go index 1798355a5e..791d59a055 100644 --- a/retrieval/targetmanager.go +++ b/retrieval/targetmanager.go @@ -38,6 +38,7 @@ type TargetManager struct { // Set of unqiue targets by scrape configuration. targetSets map[string]*targetSet + logger log.Logger } type targetSet struct { @@ -49,16 +50,17 @@ type targetSet struct { } // NewTargetManager creates a new TargetManager. -func NewTargetManager(app storage.SampleAppender) *TargetManager { +func NewTargetManager(app storage.SampleAppender, logger log.Logger) *TargetManager { return &TargetManager{ appender: app, targetSets: map[string]*targetSet{}, + logger: logger, } } // Run starts background processing to handle target updates. func (tm *TargetManager) Run() { - log.Info("Starting target manager...") + tm.logger.Info("Starting target manager...") tm.mtx.Lock() @@ -72,7 +74,7 @@ func (tm *TargetManager) Run() { // Stop all background processing. func (tm *TargetManager) Stop() { - log.Infoln("Stopping target manager...") + tm.logger.Infoln("Stopping target manager...") tm.mtx.Lock() // Cancel the base context, this will cause all target providers to shut down @@ -84,7 +86,7 @@ func (tm *TargetManager) Stop() { // Wait for all scrape inserts to complete. tm.wg.Wait() - log.Debugln("Target manager stopped") + tm.logger.Debugln("Target manager stopped") } func (tm *TargetManager) reload() { @@ -118,7 +120,7 @@ func (tm *TargetManager) reload() { } else { ts.sp.reload(scfg) } - ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig)) + ts.ts.UpdateProviders(discovery.ProvidersFromConfig(scfg.ServiceDiscoveryConfig, tm.logger)) } // Remove old target sets. Waiting for scrape pools to complete pending