diff --git a/main.go b/main.go index fd7b04052e..9d751d49ab 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,8 @@ import ( "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/storage/metric" + "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/storage/remote/opentsdb" "github.com/prometheus/prometheus/web" "github.com/prometheus/prometheus/web/api" ) @@ -43,6 +45,9 @@ var ( alertmanagerUrl = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.") + remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.") + remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.") + samplesQueueCapacity = flag.Int("storage.queue.samplesCapacity", 4096, "The size of the unwritten samples queue.") diskAppendQueueCapacity = flag.Int("storage.queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.") memoryAppendQueueCapacity = flag.Int("storage.queue.memoryAppendCapacity", 10000, "The size of the queue for items that are pending writing to memory.") @@ -84,10 +89,11 @@ type prometheus struct { unwrittenSamples chan *extraction.Result - ruleManager rules.RuleManager - targetManager retrieval.TargetManager - notifications chan notification.NotificationReqs - storage *metric.TieredStorage + ruleManager rules.RuleManager + targetManager retrieval.TargetManager + notifications chan notification.NotificationReqs + storage *metric.TieredStorage + remoteTSDBQueue *remote.TSDBQueueManager curationState metric.CurationStateUpdater } @@ -186,6 +192,10 @@ func (p *prometheus) close() { p.storage.Close() + if p.remoteTSDBQueue != nil { + p.remoteTSDBQueue.Close() + } + close(p.notifications) close(p.stopBackgroundOperations) } @@ -212,6 +222,15 @@ func main() { glog.Fatal("Error opening storage: ", err) } + var remoteTSDBQueue *remote.TSDBQueueManager = nil + if *remoteTSDBUrl == "" { + glog.Warningf("No TSDB URL provided; not sending any samples to long-term storage") + } else { + openTSDB := opentsdb.NewClient(*remoteTSDBUrl, *remoteTSDBTimeout) + remoteTSDBQueue = remote.NewTSDBQueueManager(openTSDB, 512) + go remoteTSDBQueue.Run() + } + unwrittenSamples := make(chan *extraction.Result, *samplesQueueCapacity) ingester := &retrieval.MergeLabelsIngester{ Labels: conf.GlobalLabels(), @@ -298,10 +317,11 @@ func main() { stopBackgroundOperations: make(chan bool, 1), - ruleManager: ruleManager, - targetManager: targetManager, - notifications: notifications, - storage: ts, + ruleManager: ruleManager, + targetManager: targetManager, + notifications: notifications, + storage: ts, + remoteTSDBQueue: remoteTSDBQueue, } defer prometheus.close() @@ -368,8 +388,11 @@ func main() { // TODO(all): Migrate this into prometheus.serve(). for block := range unwrittenSamples { - if block.Err == nil { + if block.Err == nil && len(block.Samples) > 0 { ts.AppendSamples(block.Samples) + if remoteTSDBQueue != nil { + remoteTSDBQueue.Queue(block.Samples) + } } } } diff --git a/storage/remote/instrumentation.go b/storage/remote/instrumentation.go new file mode 100644 index 0000000000..0947cb2aa8 --- /dev/null +++ b/storage/remote/instrumentation.go @@ -0,0 +1,55 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + result = "result" + success = "success" + failure = "failure" + dropped = "dropped" + + facet = "facet" + occupancy = "occupancy" + capacity = "capacity" +) + +var ( + samplesCount = prometheus.NewCounter() + sendLatency = prometheus.NewDefaultHistogram() + queueSize = prometheus.NewGauge() +) + +func recordOutcome(duration time.Duration, sampleCount int, err error) { + labels := map[string]string{result: success} + if err != nil { + labels[result] = failure + } + + samplesCount.IncrementBy(labels, float64(sampleCount)) + ms := float64(duration / time.Millisecond) + sendLatency.Add(labels, ms) +} + +func init() { + prometheus.Register("prometheus_remote_tsdb_sent_samples_total", "Total number of samples processed to be sent to remote TSDB.", prometheus.NilLabels, samplesCount) + + prometheus.Register("prometheus_remote_tsdb_latency_ms", "Latency quantiles for sending samples to the remote TSDB in milliseconds.", prometheus.NilLabels, sendLatency) + prometheus.Register("prometheus_remote_tsdb_queue_size_total", "The size and capacity of the queue of samples to be sent to the remote TSDB.", prometheus.NilLabels, queueSize) +} diff --git a/storage/remote/opentsdb/client.go b/storage/remote/opentsdb/client.go new file mode 100644 index 0000000000..04894c678c --- /dev/null +++ b/storage/remote/opentsdb/client.go @@ -0,0 +1,120 @@ +package opentsdb + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "regexp" + "time" + + clientmodel "github.com/prometheus/client_golang/model" + + "github.com/prometheus/prometheus/utility" +) + +const ( + putEndpoint = "/api/put" + contentTypeJson = "application/json" +) + +var ( + illegalCharsRE = regexp.MustCompile(`[^a-zA-Z0-9_\-./]`) +) + +// Client allows sending batches of Prometheus samples to OpenTSDB. +type Client struct { + url string + httpClient *http.Client +} + +// Create a new Client. +func NewClient(url string, timeout time.Duration) *Client { + return &Client{ + url: url, + httpClient: utility.NewDeadlineClient(timeout), + } +} + +// StoreSamplesRequest is used for building a JSON request for storing samples +// via the OpenTSDB. +type StoreSamplesRequest struct { + Metric string `json:"metric"` + Timestamp int64 `json:"timestamp"` + Value clientmodel.SampleValue `json:"value"` + Tags map[string]string `json:"tags"` +} + +// Escape Prometheus label values to valid tag values for OpenTSDB. +func escapeTagValue(l clientmodel.LabelValue) string { + return illegalCharsRE.ReplaceAllString(string(l), "_") +} + +// Translate Prometheus metric into OpenTSDB tags. +func tagsFromMetric(m clientmodel.Metric) map[string]string { + tags := make(map[string]string, len(m)-1) + for l, v := range m { + if l == clientmodel.MetricNameLabel { + continue + } + tags[string(l)] = escapeTagValue(v) + } + return tags +} + +// Send a batch of samples to OpenTSDB via its HTTP API. +func (c *Client) Store(samples clientmodel.Samples) error { + reqs := make([]StoreSamplesRequest, 0, len(samples)) + for _, s := range samples { + metric := escapeTagValue(s.Metric[clientmodel.MetricNameLabel]) + reqs = append(reqs, StoreSamplesRequest{ + Metric: metric, + Timestamp: s.Timestamp.Unix(), + Value: s.Value, + Tags: tagsFromMetric(s.Metric), + }) + } + + u, err := url.Parse(c.url) + if err != nil { + return err + } + + u.Path = putEndpoint + + buf, err := json.Marshal(reqs) + if err != nil { + return err + } + + resp, err := c.httpClient.Post( + u.String(), + contentTypeJson, + bytes.NewBuffer(buf), + ) + if err != nil { + return err + } + defer resp.Body.Close() + + // API returns status code 204 for successful writes. + // http://opentsdb.net/docs/build/html/api_http/put.html + if resp.StatusCode == http.StatusNoContent { + return nil + } + + // API returns status code 400 on error, encoding error details in the + // response content in JSON. + buf, err = ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + var r map[string]int + if err := json.Unmarshal(buf, &r); err != nil { + return err + } + return fmt.Errorf("Failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) +} diff --git a/storage/remote/opentsdb/client_test.go b/storage/remote/opentsdb/client_test.go new file mode 100644 index 0000000000..aa7f56f192 --- /dev/null +++ b/storage/remote/opentsdb/client_test.go @@ -0,0 +1,41 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opentsdb + +import ( + "testing" + + clientmodel "github.com/prometheus/client_golang/model" +) + +func TestTagsFromMetric(t *testing.T) { + input := clientmodel.Metric{ + clientmodel.MetricNameLabel: "testmetric", + "test:label": "test:value", + "many_chars": "abc!ABC:012-3!45รถ67~89./", + } + expected := map[string]string{ + "test:label": "test_value", + "many_chars": "abc_ABC_012-3_45_67_89./", + } + actual := tagsFromMetric(input) + if len(actual) != len(expected) { + t.Fatalf("Expected %v, got %v", expected, actual) + } + for k, v := range expected { + if v != actual[k] { + t.Fatalf("Expected %s => %s, got %s => %s", k, v, k, actual[k]) + } + } +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go new file mode 100644 index 0000000000..0bd2e0c728 --- /dev/null +++ b/storage/remote/queue_manager.go @@ -0,0 +1,148 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "time" + + "github.com/golang/glog" + + clientmodel "github.com/prometheus/client_golang/model" +) + +const ( + // The maximum number of concurrent send requests to the TSDB. + maxConcurrentSends = 10 + // The maximum number of samples to fit into a single request to the TSDB. + maxSamplesPerSend = 100 + // The deadline after which to send queued samples even if the maximum batch + // size has not been reached. + batchSendDeadline = 5 * time.Second +) + +// TSDBClient defines an interface for sending a batch of samples to an +// external timeseries database (TSDB). +type TSDBClient interface { + Store(clientmodel.Samples) error +} + +// TSDBQueueManager manages a queue of samples to be sent to the TSDB indicated +// by the provided TSDBClient. +type TSDBQueueManager struct { + tsdb TSDBClient + queue chan clientmodel.Samples + pendingSamples clientmodel.Samples + sendSemaphore chan bool + drained chan bool +} + +// Build a new TSDBQueueManager. +func NewTSDBQueueManager(tsdb TSDBClient, queueCapacity int) *TSDBQueueManager { + return &TSDBQueueManager{ + tsdb: tsdb, + queue: make(chan clientmodel.Samples, queueCapacity), + sendSemaphore: make(chan bool, maxConcurrentSends), + drained: make(chan bool), + } +} + +// Queue a sample batch to be sent to the TSDB. This drops the most recently +// queued samples on the floor if the queue is full. +func (t *TSDBQueueManager) Queue(s clientmodel.Samples) { + select { + case t.queue <- s: + default: + samplesCount.IncrementBy(map[string]string{result: dropped}, float64(len(s))) + glog.Warningf("TSDB queue full, discarding %d samples", len(s)) + } +} + +func (t *TSDBQueueManager) sendSamples(s clientmodel.Samples) { + t.sendSemaphore <- true + defer func() { + <-t.sendSemaphore + }() + + // Samples are sent to the TSDB on a best-effort basis. If a sample isn't + // sent correctly the first time, it's simply dropped on the floor. + begin := time.Now() + err := t.tsdb.Store(s) + recordOutcome(time.Since(begin), len(s), err) + + if err != nil { + glog.Warningf("error sending %d samples to TSDB: %s", len(s), err) + } +} + +// Report notification queue occupancy and capacity. +func (t *TSDBQueueManager) reportQueues() { + queueSize.Set(map[string]string{facet: occupancy}, float64(len(t.queue))) + queueSize.Set(map[string]string{facet: capacity}, float64(cap(t.queue))) +} + +// Continuously send samples to the TSDB. +func (t *TSDBQueueManager) Run() { + defer func() { + close(t.drained) + }() + + queueReportTicker := time.NewTicker(time.Second) + go func() { + for _ = range queueReportTicker.C { + t.reportQueues() + } + }() + defer queueReportTicker.Stop() + + // Send batches of at most maxSamplesPerSend samples to the TSDB. If we + // have fewer samples than that, flush them out after a deadline anyways. + for { + select { + case s, ok := <-t.queue: + if !ok { + glog.Infof("Flushing %d samples to OpenTSDB...", len(t.pendingSamples)) + t.flush() + glog.Infof("Done flushing.") + return + } + + t.pendingSamples = append(t.pendingSamples, s...) + + for len(t.pendingSamples) >= maxSamplesPerSend { + go t.sendSamples(t.pendingSamples[:maxSamplesPerSend]) + t.pendingSamples = t.pendingSamples[maxSamplesPerSend:] + } + case <-time.After(batchSendDeadline): + t.flush() + } + } +} + +// Flush remaining queued samples. +func (t *TSDBQueueManager) flush() { + if len(t.pendingSamples) > 0 { + go t.sendSamples(t.pendingSamples) + } + t.pendingSamples = t.pendingSamples[:0] +} + +// Stop sending samples to the TSDB and wait for pending sends to complete. +func (t *TSDBQueueManager) Close() { + glog.Infof("TSDB queue manager shutting down...") + close(t.queue) + <-t.drained + for i := 0; i < maxConcurrentSends; i++ { + t.sendSemaphore <- true + } +} diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go new file mode 100644 index 0000000000..f14d06cb27 --- /dev/null +++ b/storage/remote/queue_manager_test.go @@ -0,0 +1,77 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "sync" + "testing" + + clientmodel "github.com/prometheus/client_golang/model" +) + +type TestTSDBClient struct { + receivedSamples clientmodel.Samples + expectedSamples clientmodel.Samples + wg sync.WaitGroup +} + +func (c *TestTSDBClient) expectSamples(s clientmodel.Samples) { + c.expectedSamples = append(c.expectedSamples, s...) + c.wg.Add(len(s)) +} + +func (c *TestTSDBClient) waitForExpectedSamples(t *testing.T) { + c.wg.Wait() + for i, expected := range c.expectedSamples { + if !expected.Equal(c.receivedSamples[i]) { + t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[i]) + } + } +} + +func (c *TestTSDBClient) Store(s clientmodel.Samples) error { + c.receivedSamples = append(c.receivedSamples, s...) + c.wg.Add(-len(s)) + return nil +} + +func TestSampleDelivery(t *testing.T) { + // Let's create an even number of send batches so we don't run into the + // batch timeout case. + n := maxSamplesPerSend * 2 + + samples := make(clientmodel.Samples, 0, n) + for i := 0; i < n; i++ { + samples = append(samples, &clientmodel.Sample{ + Metric: clientmodel.Metric{ + clientmodel.MetricNameLabel: "test_metric", + }, + Value: clientmodel.SampleValue(i), + }) + } + + c := &TestTSDBClient{} + c.expectSamples(samples[:len(samples)/2]) + m := NewTSDBQueueManager(c, 1) + + // These should be received by the client. + m.Queue(samples[:len(samples)/2]) + // These will be dropped because the queue is full. + m.Queue(samples[len(samples)/2:]) + + go m.Run() + defer m.Close() + + c.waitForExpectedSamples(t) +}