diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 86050f824d..573c906e43 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -40,6 +40,8 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/retrieval" "github.com/prometheus/prometheus/rules" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/web" ) @@ -221,20 +223,20 @@ func main() { } logger.Infoln("tsdb started") - // remoteStorage := &remote.Storage{} - // sampleAppender = append(sampleAppender, remoteStorage) - // reloadables = append(reloadables, remoteStorage) + remoteStorage := &remote.Storage{} + reloadables = append(reloadables, remoteStorage) + fanoutStorage := storage.NewFanout(tsdb.Adapter(localStorage), remoteStorage) cfg.queryEngine.Logger = logger var ( notifier = notifier.New(&cfg.notifier, logger) - targetManager = retrieval.NewTargetManager(tsdb.Adapter(localStorage), logger) - queryEngine = promql.NewEngine(tsdb.Adapter(localStorage), &cfg.queryEngine) + targetManager = retrieval.NewTargetManager(fanoutStorage, logger) + queryEngine = promql.NewEngine(fanoutStorage, &cfg.queryEngine) ctx, cancelCtx = context.WithCancel(context.Background()) ) ruleManager := rules.NewManager(&rules.ManagerOptions{ - Appendable: tsdb.Adapter(localStorage), + Appendable: fanoutStorage, Notifier: notifier, QueryEngine: queryEngine, Context: ctx, @@ -296,8 +298,8 @@ func main() { // Start all components. The order is NOT arbitrary. defer func() { - if err := localStorage.Close(); err != nil { - logger.Errorln("Error stopping storage:", err) + if err := fanoutStorage.Close(); err != nil { + log.Errorln("Error stopping storage:", err) } }() diff --git a/documentation/examples/remote_storage/example_write_adapter/server.go b/documentation/examples/remote_storage/example_write_adapter/server.go index 38cf0dff9c..ec9218797a 100644 --- a/documentation/examples/remote_storage/example_write_adapter/server.go +++ b/documentation/examples/remote_storage/example_write_adapter/server.go @@ -22,7 +22,7 @@ import ( "github.com/golang/snappy" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/prompb" ) func main() { @@ -39,7 +39,7 @@ func main() { return } - var req remote.WriteRequest + var req prompb.WriteRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -53,7 +53,7 @@ func main() { fmt.Println(m) for _, s := range ts.Samples { - fmt.Printf(" %f %d\n", s.Value, s.TimestampMs) + fmt.Printf(" %f %d\n", s.Value, s.Timestamp) } } }) diff --git a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go index 9b446a12c8..68507f6aa8 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/influxdb/client.go @@ -22,7 +22,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/log" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/prompb" influx "github.com/influxdata/influxdb/client/v2" ) @@ -101,8 +101,8 @@ func (c *Client) Write(samples model.Samples) error { return c.client.Write(bps) } -func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { - labelsToSeries := map[string]*remote.TimeSeries{} +func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) { + labelsToSeries := map[string]*prompb.TimeSeries{} for _, q := range req.Queries { command, err := c.buildCommand(q) if err != nil { @@ -123,9 +123,9 @@ func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { } } - resp := remote.ReadResponse{ - Results: []*remote.QueryResult{ - {Timeseries: make([]*remote.TimeSeries, 0, len(labelsToSeries))}, + resp := prompb.ReadResponse{ + Results: []*prompb.QueryResult{ + {Timeseries: make([]*prompb.TimeSeries, 0, len(labelsToSeries))}, }, } for _, ts := range labelsToSeries { @@ -134,7 +134,7 @@ func (c *Client) Read(req *remote.ReadRequest) (*remote.ReadResponse, error) { return &resp, nil } -func (c *Client) buildCommand(q *remote.Query) (string, error) { +func (c *Client) buildCommand(q *prompb.Query) (string, error) { matchers := make([]string, 0, len(q.Matchers)) // If we don't find a metric name matcher, query all metrics // (InfluxDB measurements) by default. @@ -142,9 +142,9 @@ func (c *Client) buildCommand(q *remote.Query) (string, error) { for _, m := range q.Matchers { if m.Name == model.MetricNameLabel { switch m.Type { - case remote.MatchType_EQUAL: + case prompb.LabelMatcher_EQ: from = fmt.Sprintf("FROM %q.%q", c.retentionPolicy, m.Value) - case remote.MatchType_REGEX_MATCH: + case prompb.LabelMatcher_RE: from = fmt.Sprintf("FROM %q./^%s$/", c.retentionPolicy, escapeSlashes(m.Value)) default: // TODO: Figure out how to support these efficiently. @@ -154,13 +154,13 @@ func (c *Client) buildCommand(q *remote.Query) (string, error) { } switch m.Type { - case remote.MatchType_EQUAL: + case prompb.LabelMatcher_EQ: matchers = append(matchers, fmt.Sprintf("%q = '%s'", m.Name, escapeSingleQuotes(m.Value))) - case remote.MatchType_NOT_EQUAL: + case prompb.LabelMatcher_NEQ: matchers = append(matchers, fmt.Sprintf("%q != '%s'", m.Name, escapeSingleQuotes(m.Value))) - case remote.MatchType_REGEX_MATCH: + case prompb.LabelMatcher_RE: matchers = append(matchers, fmt.Sprintf("%q =~ /^%s$/", m.Name, escapeSlashes(m.Value))) - case remote.MatchType_REGEX_NO_MATCH: + case prompb.LabelMatcher_NRE: matchers = append(matchers, fmt.Sprintf("%q !~ /^%s$/", m.Name, escapeSlashes(m.Value))) default: return "", fmt.Errorf("unknown match type %v", m.Type) @@ -180,13 +180,13 @@ func escapeSlashes(str string) string { return strings.Replace(str, `/`, `\/`, -1) } -func mergeResult(labelsToSeries map[string]*remote.TimeSeries, results []influx.Result) error { +func mergeResult(labelsToSeries map[string]*prompb.TimeSeries, results []influx.Result) error { for _, r := range results { for _, s := range r.Series { k := concatLabels(s.Tags) ts, ok := labelsToSeries[k] if !ok { - ts = &remote.TimeSeries{ + ts = &prompb.TimeSeries{ Labels: tagsToLabelPairs(s.Name, s.Tags), } labelsToSeries[k] = ts @@ -214,8 +214,8 @@ func concatLabels(labels map[string]string) string { return strings.Join(pairs, separator) } -func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair { - pairs := make([]*remote.LabelPair, 0, len(tags)) +func tagsToLabelPairs(name string, tags map[string]string) []*prompb.Label { + pairs := make([]*prompb.Label, 0, len(tags)) for k, v := range tags { if v == "" { // If we select metrics with different sets of labels names, @@ -226,20 +226,20 @@ func tagsToLabelPairs(name string, tags map[string]string) []*remote.LabelPair { // to make the result correct. continue } - pairs = append(pairs, &remote.LabelPair{ + pairs = append(pairs, &prompb.Label{ Name: k, Value: v, }) } - pairs = append(pairs, &remote.LabelPair{ + pairs = append(pairs, &prompb.Label{ Name: model.MetricNameLabel, Value: name, }) return pairs } -func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) { - samples := make([]*remote.Sample, 0, len(values)) +func valuesToSamples(values [][]interface{}) ([]*prompb.Sample, error) { + samples := make([]*prompb.Sample, 0, len(values)) for _, v := range values { if len(v) != 2 { return nil, fmt.Errorf("bad sample tuple length, expected [, ], got %v", v) @@ -265,9 +265,9 @@ func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) { return nil, fmt.Errorf("unable to convert sample value to float64: %v", err) } - samples = append(samples, &remote.Sample{ - TimestampMs: timestamp, - Value: value, + samples = append(samples, &prompb.Sample{ + Timestamp: timestamp, + Value: value, }) } return samples, nil @@ -275,14 +275,14 @@ func valuesToSamples(values [][]interface{}) ([]*remote.Sample, error) { // mergeSamples merges two lists of sample pairs and removes duplicate // timestamps. It assumes that both lists are sorted by timestamp. -func mergeSamples(a, b []*remote.Sample) []*remote.Sample { - result := make([]*remote.Sample, 0, len(a)+len(b)) +func mergeSamples(a, b []*prompb.Sample) []*prompb.Sample { + result := make([]*prompb.Sample, 0, len(a)+len(b)) i, j := 0, 0 for i < len(a) && j < len(b) { - if a[i].TimestampMs < b[j].TimestampMs { + if a[i].Timestamp < b[j].Timestamp { result = append(result, a[i]) i++ - } else if a[i].TimestampMs > b[j].TimestampMs { + } else if a[i].Timestamp > b[j].Timestamp { result = append(result, b[j]) j++ } else { diff --git a/documentation/examples/remote_storage/remote_storage_adapter/main.go b/documentation/examples/remote_storage/remote_storage_adapter/main.go index 87112b5969..3ea30bd8f5 100644 --- a/documentation/examples/remote_storage/remote_storage_adapter/main.go +++ b/documentation/examples/remote_storage/remote_storage_adapter/main.go @@ -36,7 +36,7 @@ import ( "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/graphite" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/influxdb" "github.com/prometheus/prometheus/documentation/examples/remote_storage/remote_storage_adapter/opentsdb" - "github.com/prometheus/prometheus/storage/remote" + "github.com/prometheus/prometheus/prompb" ) type config struct { @@ -146,7 +146,7 @@ type writer interface { } type reader interface { - Read(req *remote.ReadRequest) (*remote.ReadResponse, error) + Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) Name() string } @@ -196,7 +196,7 @@ func serve(addr string, writers []writer, readers []reader) error { return } - var req remote.WriteRequest + var req prompb.WriteRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -229,7 +229,7 @@ func serve(addr string, writers []writer, readers []reader) error { return } - var req remote.ReadRequest + var req prompb.ReadRequest if err := proto.Unmarshal(reqBuf, &req); err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return @@ -242,7 +242,7 @@ func serve(addr string, writers []writer, readers []reader) error { } reader := readers[0] - var resp *remote.ReadResponse + var resp *prompb.ReadResponse resp, err = reader.Read(&req) if err != nil { log.With("query", req).With("storage", reader.Name()).With("err", err).Warnf("Error executing query") @@ -269,7 +269,7 @@ func serve(addr string, writers []writer, readers []reader) error { return http.ListenAndServe(addr, nil) } -func protoToSamples(req *remote.WriteRequest) model.Samples { +func protoToSamples(req *prompb.WriteRequest) model.Samples { var samples model.Samples for _, ts := range req.Timeseries { metric := make(model.Metric, len(ts.Labels)) @@ -281,7 +281,7 @@ func protoToSamples(req *remote.WriteRequest) model.Samples { samples = append(samples, &model.Sample{ Metric: metric, Value: model.SampleValue(s.Value), - Timestamp: model.Time(s.TimestampMs), + Timestamp: model.Time(s.Timestamp), }) } } diff --git a/prompb/remote.pb.go b/prompb/remote.pb.go new file mode 100644 index 0000000000..d3fd751d9f --- /dev/null +++ b/prompb/remote.pb.go @@ -0,0 +1,987 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: remote.proto + +/* + Package prompb is a generated protocol buffer package. + + It is generated from these files: + remote.proto + rpc.proto + types.proto + + It has these top-level messages: + WriteRequest + ReadRequest + ReadResponse + Query + QueryResult + TSDBSnapshotRequest + TSDBSnapshotResponse + SeriesDeleteRequest + SeriesDeleteResponse + Sample + TimeSeries + Label + Labels + LabelMatcher +*/ +package prompb + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type WriteRequest struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{0} } + +func (m *WriteRequest) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +type ReadRequest struct { + Queries []*Query `protobuf:"bytes,1,rep,name=queries" json:"queries,omitempty"` +} + +func (m *ReadRequest) Reset() { *m = ReadRequest{} } +func (m *ReadRequest) String() string { return proto.CompactTextString(m) } +func (*ReadRequest) ProtoMessage() {} +func (*ReadRequest) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{1} } + +func (m *ReadRequest) GetQueries() []*Query { + if m != nil { + return m.Queries + } + return nil +} + +type ReadResponse struct { + // In same order as the request's queries. + Results []*QueryResult `protobuf:"bytes,1,rep,name=results" json:"results,omitempty"` +} + +func (m *ReadResponse) Reset() { *m = ReadResponse{} } +func (m *ReadResponse) String() string { return proto.CompactTextString(m) } +func (*ReadResponse) ProtoMessage() {} +func (*ReadResponse) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{2} } + +func (m *ReadResponse) GetResults() []*QueryResult { + if m != nil { + return m.Results + } + return nil +} + +type Query struct { + StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` + EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"` + Matchers []*LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers,omitempty"` +} + +func (m *Query) Reset() { *m = Query{} } +func (m *Query) String() string { return proto.CompactTextString(m) } +func (*Query) ProtoMessage() {} +func (*Query) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{3} } + +func (m *Query) GetStartTimestampMs() int64 { + if m != nil { + return m.StartTimestampMs + } + return 0 +} + +func (m *Query) GetEndTimestampMs() int64 { + if m != nil { + return m.EndTimestampMs + } + return 0 +} + +func (m *Query) GetMatchers() []*LabelMatcher { + if m != nil { + return m.Matchers + } + return nil +} + +type QueryResult struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *QueryResult) Reset() { *m = QueryResult{} } +func (m *QueryResult) String() string { return proto.CompactTextString(m) } +func (*QueryResult) ProtoMessage() {} +func (*QueryResult) Descriptor() ([]byte, []int) { return fileDescriptorRemote, []int{4} } + +func (m *QueryResult) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +func init() { + proto.RegisterType((*WriteRequest)(nil), "prometheus.WriteRequest") + proto.RegisterType((*ReadRequest)(nil), "prometheus.ReadRequest") + proto.RegisterType((*ReadResponse)(nil), "prometheus.ReadResponse") + proto.RegisterType((*Query)(nil), "prometheus.Query") + proto.RegisterType((*QueryResult)(nil), "prometheus.QueryResult") +} +func (m *WriteRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *WriteRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, msg := range m.Timeseries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *ReadRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Queries) > 0 { + for _, msg := range m.Queries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *ReadResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadResponse) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Results) > 0 { + for _, msg := range m.Results { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *Query) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Query) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.StartTimestampMs != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.StartTimestampMs)) + } + if m.EndTimestampMs != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintRemote(dAtA, i, uint64(m.EndTimestampMs)) + } + if len(m.Matchers) > 0 { + for _, msg := range m.Matchers { + dAtA[i] = 0x1a + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *QueryResult) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryResult) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, msg := range m.Timeseries { + dAtA[i] = 0xa + i++ + i = encodeVarintRemote(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func encodeFixed64Remote(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Remote(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *WriteRequest) Size() (n int) { + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *ReadRequest) Size() (n int) { + var l int + _ = l + if len(m.Queries) > 0 { + for _, e := range m.Queries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *ReadResponse) Size() (n int) { + var l int + _ = l + if len(m.Results) > 0 { + for _, e := range m.Results { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *Query) Size() (n int) { + var l int + _ = l + if m.StartTimestampMs != 0 { + n += 1 + sovRemote(uint64(m.StartTimestampMs)) + } + if m.EndTimestampMs != 0 { + n += 1 + sovRemote(uint64(m.EndTimestampMs)) + } + if len(m.Matchers) > 0 { + for _, e := range m.Matchers { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func (m *QueryResult) Size() (n int) { + var l int + _ = l + if len(m.Timeseries) > 0 { + for _, e := range m.Timeseries { + l = e.Size() + n += 1 + l + sovRemote(uint64(l)) + } + } + return n +} + +func sovRemote(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozRemote(x uint64) (n int) { + return sovRemote(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *WriteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: WriteRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: WriteRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Timeseries = append(m.Timeseries, &TimeSeries{}) + if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReadRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Queries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Queries = append(m.Queries, &Query{}) + if err := m.Queries[len(m.Queries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *ReadResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Results", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Results = append(m.Results, &QueryResult{}) + if err := m.Results[len(m.Results)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Query) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Query: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Query: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTimestampMs", wireType) + } + m.StartTimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTimestampMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndTimestampMs", wireType) + } + m.EndTimestampMs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndTimestampMs |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Matchers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Matchers = append(m.Matchers, &LabelMatcher{}) + if err := m.Matchers[len(m.Matchers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryResult) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryResult: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryResult: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRemote + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRemote + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Timeseries = append(m.Timeseries, &TimeSeries{}) + if err := m.Timeseries[len(m.Timeseries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipRemote(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthRemote + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipRemote(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthRemote + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowRemote + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipRemote(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthRemote = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowRemote = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("remote.proto", fileDescriptorRemote) } + +var fileDescriptorRemote = []byte{ + // 284 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x91, 0xcf, 0x4a, 0xc3, 0x40, + 0x10, 0xc6, 0x59, 0x8b, 0xad, 0x4c, 0x8a, 0xd4, 0x45, 0x34, 0x78, 0x08, 0x92, 0x53, 0x40, 0x09, + 0xf8, 0x07, 0x0f, 0xde, 0x14, 0xf4, 0x64, 0x0f, 0xae, 0x05, 0xc1, 0x4b, 0x49, 0xcc, 0x40, 0x03, + 0xdd, 0x64, 0xbb, 0x33, 0x39, 0xf4, 0x41, 0x7c, 0x27, 0x8f, 0x3e, 0x82, 0xe4, 0x49, 0xa4, 0x1b, + 0xa2, 0x2b, 0xde, 0x3c, 0xff, 0x7e, 0xdf, 0xc7, 0xc7, 0x0c, 0x8c, 0x2d, 0xea, 0x9a, 0x31, 0x35, + 0xb6, 0xe6, 0x5a, 0x82, 0xb1, 0xb5, 0x46, 0x5e, 0x60, 0x43, 0x47, 0x01, 0xaf, 0x0d, 0x52, 0x07, + 0xe2, 0x7b, 0x18, 0x3f, 0xdb, 0x92, 0x51, 0xe1, 0xaa, 0x41, 0x62, 0x79, 0x05, 0xc0, 0xa5, 0x46, + 0x42, 0x5b, 0x22, 0x85, 0xe2, 0x78, 0x90, 0x04, 0xe7, 0x07, 0xe9, 0x4f, 0x3a, 0x9d, 0x95, 0x1a, + 0x9f, 0x1c, 0x55, 0x9e, 0x19, 0x5f, 0x43, 0xa0, 0x30, 0x2b, 0xfa, 0x9a, 0x13, 0x18, 0xad, 0x1a, + 0xbf, 0x63, 0xcf, 0xef, 0x78, 0x6c, 0xd0, 0xae, 0x55, 0x6f, 0xc4, 0x37, 0x30, 0xee, 0xb2, 0x64, + 0xea, 0x8a, 0x50, 0x9e, 0xc1, 0xc8, 0x22, 0x35, 0x4b, 0xee, 0xc3, 0x87, 0x7f, 0xc3, 0x8e, 0xab, + 0xde, 0x8b, 0xdf, 0x04, 0x6c, 0x3b, 0x20, 0x4f, 0x41, 0x12, 0x67, 0x96, 0xe7, 0x6e, 0x1c, 0x67, + 0xda, 0xcc, 0xf5, 0xa6, 0x47, 0x24, 0x03, 0x35, 0x71, 0x64, 0xd6, 0x83, 0x29, 0xc9, 0x04, 0x26, + 0x58, 0x15, 0xbf, 0xdd, 0x2d, 0xe7, 0xee, 0x62, 0x55, 0xf8, 0xe6, 0x25, 0xec, 0xe8, 0x8c, 0x5f, + 0x17, 0x68, 0x29, 0x1c, 0xb8, 0x55, 0xa1, 0xbf, 0xea, 0x21, 0xcb, 0x71, 0x39, 0xed, 0x04, 0xf5, + 0x6d, 0xc6, 0x77, 0x10, 0x78, 0x7b, 0xff, 0x7b, 0xdd, 0xdb, 0xfd, 0xf7, 0x36, 0x12, 0x1f, 0x6d, + 0x24, 0x3e, 0xdb, 0x48, 0xbc, 0x0c, 0x37, 0x01, 0x93, 0xe7, 0x43, 0xf7, 0xc2, 0x8b, 0xaf, 0x00, + 0x00, 0x00, 0xff, 0xff, 0x0a, 0x31, 0xaa, 0x60, 0xeb, 0x01, 0x00, 0x00, +} diff --git a/prompb/remote.proto b/prompb/remote.proto new file mode 100644 index 0000000000..6049d54684 --- /dev/null +++ b/prompb/remote.proto @@ -0,0 +1,42 @@ +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package prometheus; + +option go_package = "prompb"; + +import "types.proto"; + +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1; +} + +message ReadRequest { + repeated Query queries = 1; +} + +message ReadResponse { + // In same order as the request's queries. + repeated QueryResult results = 1; +} + +message Query { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + repeated prometheus.LabelMatcher matchers = 3; +} + +message QueryResult { + repeated prometheus.TimeSeries timeseries = 1; +} diff --git a/retrieval/helpers_test.go b/retrieval/helpers_test.go index 4961a968dd..05aadb3ea4 100644 --- a/retrieval/helpers_test.go +++ b/retrieval/helpers_test.go @@ -27,16 +27,16 @@ func (a nopAppendable) Appender() (storage.Appender, error) { type nopAppender struct{} -func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil } -func (a nopAppender) AddFast(string, int64, float64) error { return nil } -func (a nopAppender) Commit() error { return nil } -func (a nopAppender) Rollback() error { return nil } +func (a nopAppender) Add(labels.Labels, int64, float64) (string, error) { return "", nil } +func (a nopAppender) AddFast(labels.Labels, string, int64, float64) error { return nil } +func (a nopAppender) Commit() error { return nil } +func (a nopAppender) Rollback() error { return nil } type collectResultAppender struct { result []sample } -func (a *collectResultAppender) AddFast(ref string, t int64, v float64) error { +func (a *collectResultAppender) AddFast(m labels.Labels, ref string, t int64, v float64) error { // Not implemented. return storage.ErrNotFound } diff --git a/retrieval/scrape.go b/retrieval/scrape.go index 378198f505..db4e02f27d 100644 --- a/retrieval/scrape.go +++ b/retrieval/scrape.go @@ -773,7 +773,8 @@ loop: ref, ok := sl.cache.getRef(yoloString(met)) if ok { - switch err = app.AddFast(ref, t, v); err { + lset := sl.cache.lsets[ref].lset + switch err = app.AddFast(lset, ref, t, v); err { case nil: if tp == nil { e := sl.cache.lsets[ref] @@ -975,7 +976,8 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v ref, ok := sl.cache.getRef(s2) if ok { - err := app.AddFast(ref, t, v) + lset := sl.cache.lsets[ref].lset + err := app.AddFast(lset, ref, t, v) switch err { case nil: return nil @@ -989,13 +991,13 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s string, t int64, v return err } } - met := labels.Labels{ + lset := labels.Labels{ labels.Label{Name: labels.MetricName, Value: s}, } - ref, err := app.Add(met, t, v) + ref, err := app.Add(lset, t, v) switch err { case nil: - sl.cache.addRef(s2, ref, met, met.Hash()) + sl.cache.addRef(s2, ref, lset, lset.Hash()) return nil case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp: return nil diff --git a/retrieval/scrape_test.go b/retrieval/scrape_test.go index 7ef8f6a818..5b4e23b554 100644 --- a/retrieval/scrape_test.go +++ b/retrieval/scrape_test.go @@ -885,8 +885,8 @@ func (app *errorAppender) Add(lset labels.Labels, t int64, v float64) (string, e } } -func (app *errorAppender) AddFast(ref string, t int64, v float64) error { - return app.collectResultAppender.AddFast(ref, t, v) +func (app *errorAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error { + return app.collectResultAppender.AddFast(lset, ref, t, v) } func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T) { diff --git a/retrieval/target.go b/retrieval/target.go index 2949074a65..f4f1e4609d 100644 --- a/retrieval/target.go +++ b/retrieval/target.go @@ -212,14 +212,14 @@ func (app *limitAppender) Add(lset labels.Labels, t int64, v float64) (string, e return ref, nil } -func (app *limitAppender) AddFast(ref string, t int64, v float64) error { +func (app *limitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error { if !value.IsStaleNaN(v) { app.i++ if app.i > app.limit { return errSampleLimit } } - if err := app.Appender.AddFast(ref, t, v); err != nil { + if err := app.Appender.AddFast(lset, ref, t, v); err != nil { return err } return nil @@ -243,11 +243,11 @@ func (app *timeLimitAppender) Add(lset labels.Labels, t int64, v float64) (strin return ref, nil } -func (app *timeLimitAppender) AddFast(ref string, t int64, v float64) error { +func (app *timeLimitAppender) AddFast(lset labels.Labels, ref string, t int64, v float64) error { if t > app.maxTime { return storage.ErrOutOfBounds } - if err := app.Appender.AddFast(ref, t, v); err != nil { + if err := app.Appender.AddFast(lset, ref, t, v); err != nil { return err } return nil diff --git a/storage/fanout.go b/storage/fanout.go new file mode 100644 index 0000000000..b44bf34fe3 --- /dev/null +++ b/storage/fanout.go @@ -0,0 +1,433 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package remote + +package storage + +import ( + "container/heap" + "strings" + + "github.com/prometheus/common/log" + "github.com/prometheus/prometheus/pkg/labels" +) + +type fanout struct { + primary Storage + secondaries []Storage +} + +// NewFanout returns a new fan-out Storage, which proxies reads and writes +// through to multiple underlying storages. +func NewFanout(primary Storage, secondaries ...Storage) Storage { + return &fanout{ + primary: primary, + secondaries: secondaries, + } +} + +func (f *fanout) Querier(mint, maxt int64) (Querier, error) { + queriers := mergeQuerier{ + queriers: make([]Querier, 0, 1+len(f.secondaries)), + } + + // Add primary querier + querier, err := f.primary.Querier(mint, maxt) + if err != nil { + return nil, err + } + queriers.queriers = append(queriers.queriers, querier) + + // Add secondary queriers + for _, storage := range f.secondaries { + querier, err := storage.Querier(mint, maxt) + if err != nil { + queriers.Close() + return nil, err + } + queriers.queriers = append(queriers.queriers, querier) + } + return &queriers, nil +} + +func (f *fanout) Appender() (Appender, error) { + primary, err := f.primary.Appender() + if err != nil { + return nil, err + } + + secondaries := make([]Appender, 0, len(f.secondaries)) + for _, storage := range f.secondaries { + appender, err := storage.Appender() + if err != nil { + return nil, err + } + secondaries = append(secondaries, appender) + } + return &fanoutAppender{ + primary: primary, + secondaries: secondaries, + }, nil +} + +// Close closes the storage and all its underlying resources. +func (f *fanout) Close() error { + if err := f.primary.Close(); err != nil { + return err + } + + // TODO return multiple errors? + var lastErr error + for _, storage := range f.secondaries { + if err := storage.Close(); err != nil { + lastErr = err + } + } + return lastErr +} + +// fanoutAppender implements Appender. +type fanoutAppender struct { + primary Appender + secondaries []Appender +} + +func (f *fanoutAppender) Add(l labels.Labels, t int64, v float64) (string, error) { + ref, err := f.primary.Add(l, t, v) + if err != nil { + return ref, err + } + + for _, appender := range f.secondaries { + if _, err := appender.Add(l, t, v); err != nil { + return "", err + } + } + return ref, nil +} + +func (f *fanoutAppender) AddFast(l labels.Labels, ref string, t int64, v float64) error { + if err := f.primary.AddFast(l, ref, t, v); err != nil { + return err + } + + for _, appender := range f.secondaries { + if _, err := appender.Add(l, t, v); err != nil { + return err + } + } + return nil +} + +func (f *fanoutAppender) Commit() (err error) { + err = f.primary.Commit() + + for _, appender := range f.secondaries { + if err == nil { + err = appender.Commit() + } else { + if rollbackErr := appender.Rollback(); rollbackErr != nil { + log.Errorf("Squashed rollback error on commit: %v", rollbackErr) + } + } + } + return +} + +func (f *fanoutAppender) Rollback() (err error) { + err = f.primary.Rollback() + + for _, appender := range f.secondaries { + rollbackErr := appender.Rollback() + if err == nil { + err = rollbackErr + } else if rollbackErr != nil { + log.Errorf("Squashed rollback error on rollback: %v", rollbackErr) + } + } + return nil +} + +// mergeQuerier implements Querier. +type mergeQuerier struct { + queriers []Querier +} + +// NewMergeQuerier returns a new Querier that merges results of input queriers. +func NewMergeQuerier(queriers []Querier) Querier { + return &mergeQuerier{ + queriers: queriers, + } +} + +// Select returns a set of series that matches the given label matchers. +func (q *mergeQuerier) Select(matchers ...*labels.Matcher) SeriesSet { + seriesSets := make([]SeriesSet, 0, len(q.queriers)) + for _, querier := range q.queriers { + seriesSets = append(seriesSets, querier.Select(matchers...)) + } + return newMergeSeriesSet(seriesSets) +} + +// LabelValues returns all potential values for a label name. +func (q *mergeQuerier) LabelValues(name string) ([]string, error) { + var results [][]string + for _, querier := range q.queriers { + values, err := querier.LabelValues(name) + if err != nil { + return nil, err + } + results = append(results, values) + } + return mergeStringSlices(results), nil +} + +func mergeStringSlices(ss [][]string) []string { + switch len(ss) { + case 0: + return nil + case 1: + return ss[0] + case 2: + return mergeTwoStringSlices(ss[0], ss[1]) + default: + halfway := len(ss) / 2 + return mergeTwoStringSlices( + mergeStringSlices(ss[:halfway]), + mergeStringSlices(ss[halfway:]), + ) + } +} + +func mergeTwoStringSlices(a, b []string) []string { + i, j := 0, 0 + result := make([]string, 0, len(a)+len(b)) + for i < len(a) && j < len(b) { + switch strings.Compare(a[i], b[j]) { + case 0: + result = append(result, a[i]) + i++ + j++ + case -1: + result = append(result, a[i]) + i++ + case 1: + result = append(result, b[j]) + j++ + } + } + result = append(result, a[i:]...) + result = append(result, b[j:]...) + return result +} + +// Close releases the resources of the Querier. +func (q *mergeQuerier) Close() error { + // TODO return multiple errors? + var lastErr error + for _, querier := range q.queriers { + if err := querier.Close(); err != nil { + lastErr = err + } + } + return lastErr +} + +// mergeSeriesSet implements SeriesSet +type mergeSeriesSet struct { + currentLabels labels.Labels + currentSets []SeriesSet + heap seriesSetHeap + sets []SeriesSet +} + +func newMergeSeriesSet(sets []SeriesSet) SeriesSet { + // Sets need to be pre-advanced, so we can introspect the label of the + // series under the cursor. + var h seriesSetHeap + for _, set := range sets { + if set.Next() { + heap.Push(&h, set) + } + } + return &mergeSeriesSet{ + heap: h, + sets: sets, + } +} + +func (c *mergeSeriesSet) Next() bool { + // Firstly advance all the current series sets. If any of them have run out + // we can drop them, otherwise they should be inserted back into the heap. + for _, set := range c.currentSets { + if set.Next() { + heap.Push(&c.heap, set) + } + } + if len(c.heap) == 0 { + return false + } + + // Now, pop items of the heap that have equal label sets. + c.currentSets = nil + c.currentLabels = c.heap[0].At().Labels() + for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) { + set := heap.Pop(&c.heap).(SeriesSet) + c.currentSets = append(c.currentSets, set) + } + return true +} + +func (c *mergeSeriesSet) At() Series { + series := []Series{} + for _, seriesSet := range c.currentSets { + series = append(series, seriesSet.At()) + } + return &mergeSeries{ + labels: c.currentLabels, + series: series, + } +} + +func (c *mergeSeriesSet) Err() error { + for _, set := range c.sets { + if err := set.Err(); err != nil { + return err + } + } + return nil +} + +type seriesSetHeap []SeriesSet + +func (h seriesSetHeap) Len() int { return len(h) } +func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h seriesSetHeap) Less(i, j int) bool { + a, b := h[i].At().Labels(), h[j].At().Labels() + return labels.Compare(a, b) < 0 +} + +func (h *seriesSetHeap) Push(x interface{}) { + *h = append(*h, x.(SeriesSet)) +} + +func (h *seriesSetHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +type mergeSeries struct { + labels labels.Labels + series []Series +} + +func (m *mergeSeries) Labels() labels.Labels { + return m.labels +} + +func (m *mergeSeries) Iterator() SeriesIterator { + iterators := make([]SeriesIterator, 0, len(m.series)) + for _, s := range m.series { + iterators = append(iterators, s.Iterator()) + } + return newMergeIterator(iterators) +} + +type mergeIterator struct { + iterators []SeriesIterator + h seriesIteratorHeap +} + +func newMergeIterator(iterators []SeriesIterator) SeriesIterator { + return &mergeIterator{ + iterators: iterators, + h: nil, + } +} + +func (c *mergeIterator) Seek(t int64) bool { + c.h = seriesIteratorHeap{} + for _, iter := range c.iterators { + if iter.Seek(t) { + heap.Push(&c.h, iter) + } + } + return len(c.h) > 0 +} + +func (c *mergeIterator) At() (t int64, v float64) { + if len(c.h) == 0 { + log.Error("mergeIterator.At() called after .Next() returned false.") + return 0, 0 + } + + // TODO do I need to dedupe or just merge? + return c.h[0].At() +} + +func (c *mergeIterator) Next() bool { + if c.h == nil { + for _, iter := range c.iterators { + if iter.Next() { + heap.Push(&c.h, iter) + } + } + return len(c.h) > 0 + } + + if len(c.h) == 0 { + return false + } + + iter := heap.Pop(&c.h).(SeriesIterator) + if iter.Next() { + heap.Push(&c.h, iter) + } + + return len(c.h) > 0 +} + +func (c *mergeIterator) Err() error { + for _, iter := range c.iterators { + if err := iter.Err(); err != nil { + return err + } + } + return nil +} + +type seriesIteratorHeap []SeriesIterator + +func (h seriesIteratorHeap) Len() int { return len(h) } +func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h seriesIteratorHeap) Less(i, j int) bool { + at, _ := h[i].At() + bt, _ := h[j].At() + return at < bt +} + +func (h *seriesIteratorHeap) Push(x interface{}) { + *h = append(*h, x.(SeriesIterator)) +} + +func (h *seriesIteratorHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/storage/fanout_test.go b/storage/fanout_test.go new file mode 100644 index 0000000000..19e1600fbc --- /dev/null +++ b/storage/fanout_test.go @@ -0,0 +1,230 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package remote + +package storage + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/pkg/labels" +) + +func TestMergeStringSlices(t *testing.T) { + for _, tc := range []struct { + input [][]string + expected []string + }{ + {}, + {[][]string{{"foo"}}, []string{"foo"}}, + {[][]string{{"foo"}, {"bar"}}, []string{"bar", "foo"}}, + {[][]string{{"foo"}, {"bar"}, {"baz"}}, []string{"bar", "baz", "foo"}}, + } { + require.Equal(t, tc.expected, mergeStringSlices(tc.input)) + } +} + +func TestMergeTwoStringSlices(t *testing.T) { + for _, tc := range []struct { + a, b, expected []string + }{ + {[]string{}, []string{}, []string{}}, + {[]string{"foo"}, nil, []string{"foo"}}, + {nil, []string{"bar"}, []string{"bar"}}, + {[]string{"foo"}, []string{"bar"}, []string{"bar", "foo"}}, + {[]string{"foo"}, []string{"bar", "baz"}, []string{"bar", "baz", "foo"}}, + {[]string{"foo"}, []string{"foo"}, []string{"foo"}}, + } { + require.Equal(t, tc.expected, mergeTwoStringSlices(tc.a, tc.b)) + } +} + +func TestMergeSeriesSet(t *testing.T) { + for _, tc := range []struct { + input []SeriesSet + expected SeriesSet + }{ + { + input: []SeriesSet{newMockSeriesSet()}, + expected: newMockSeriesSet(), + }, + + { + input: []SeriesSet{newMockSeriesSet( + newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), + newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), + )}, + expected: newMockSeriesSet( + newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), + newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), + ), + }, + + { + input: []SeriesSet{newMockSeriesSet( + newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), + ), newMockSeriesSet( + newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), + )}, + expected: newMockSeriesSet( + newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), + newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), + ), + }, + + { + input: []SeriesSet{newMockSeriesSet( + newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}), + newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}), + ), newMockSeriesSet( + newMockSeries(labels.FromStrings("bar", "baz"), []sample{{3, 3}, {4, 4}}), + newMockSeries(labels.FromStrings("foo", "bar"), []sample{{2, 2}, {3, 3}}), + )}, + expected: newMockSeriesSet( + newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}}), + newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}), + ), + }, + } { + merged := newMergeSeriesSet(tc.input) + for merged.Next() { + require.True(t, tc.expected.Next()) + actualSeries := merged.At() + expectedSeries := tc.expected.At() + require.Equal(t, expectedSeries.Labels(), actualSeries.Labels()) + require.Equal(t, drainSamples(expectedSeries.Iterator()), drainSamples(actualSeries.Iterator())) + } + require.False(t, tc.expected.Next()) + } +} + +func TestMergeIterator(t *testing.T) { + for _, tc := range []struct { + input []SeriesIterator + expected []sample + }{ + { + input: []SeriesIterator{ + newListSeriesIterator([]sample{{0, 0}, {1, 1}}), + }, + expected: []sample{{0, 0}, {1, 1}}, + }, + { + input: []SeriesIterator{ + newListSeriesIterator([]sample{{0, 0}, {1, 1}}), + newListSeriesIterator([]sample{{2, 2}, {3, 3}}), + }, + expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}, + }, + { + input: []SeriesIterator{ + newListSeriesIterator([]sample{{0, 0}, {3, 3}}), + newListSeriesIterator([]sample{{1, 1}, {4, 4}}), + newListSeriesIterator([]sample{{2, 2}, {5, 5}}), + }, + expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}}, + }, + } { + merged := newMergeIterator(tc.input) + actual := drainSamples(merged) + require.Equal(t, tc.expected, actual) + } +} + +func TestMergeIteratorSeek(t *testing.T) { + for _, tc := range []struct { + input []SeriesIterator + seek int64 + expected []sample + }{ + { + input: []SeriesIterator{ + newListSeriesIterator([]sample{{0, 0}, {1, 1}, {2, 2}}), + }, + seek: 1, + expected: []sample{{1, 1}, {2, 2}}, + }, + { + input: []SeriesIterator{ + newListSeriesIterator([]sample{{0, 0}, {1, 1}}), + newListSeriesIterator([]sample{{2, 2}, {3, 3}}), + }, + seek: 2, + expected: []sample{{2, 2}, {3, 3}}, + }, + { + input: []SeriesIterator{ + newListSeriesIterator([]sample{{0, 0}, {3, 3}}), + newListSeriesIterator([]sample{{1, 1}, {4, 4}}), + newListSeriesIterator([]sample{{2, 2}, {5, 5}}), + }, + seek: 2, + expected: []sample{{2, 2}, {3, 3}, {4, 4}, {5, 5}}, + }, + } { + merged := newMergeIterator(tc.input) + actual := []sample{} + if merged.Seek(tc.seek) { + t, v := merged.At() + actual = append(actual, sample{t, v}) + } + actual = append(actual, drainSamples(merged)...) + require.Equal(t, tc.expected, actual) + } +} + +func drainSamples(iter SeriesIterator) []sample { + result := []sample{} + for iter.Next() { + t, v := iter.At() + result = append(result, sample{t, v}) + } + return result +} + +type mockSeriesSet struct { + idx int + series []Series +} + +func newMockSeriesSet(series ...Series) SeriesSet { + return &mockSeriesSet{ + idx: -1, + series: series, + } +} + +func (m *mockSeriesSet) Next() bool { + m.idx++ + return m.idx < len(m.series) +} + +func (m *mockSeriesSet) At() Series { + return m.series[m.idx] +} + +func (m *mockSeriesSet) Err() error { + return nil +} + +func newMockSeries(lset labels.Labels, samples []sample) Series { + return &mockSeries{ + labels: func() labels.Labels { + return lset + }, + iterator: func() SeriesIterator { + return newListSeriesIterator(samples) + }, + } +} diff --git a/storage/interface.go b/storage/interface.go index a1b5221a28..cb07728673 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -56,7 +56,7 @@ type Querier interface { type Appender interface { Add(l labels.Labels, t int64, v float64) (string, error) - AddFast(ref string, t int64, v float64) error + AddFast(l labels.Labels, ref string, t int64, v float64) error // Commit submits the collected samples and purges the batch. Commit() error diff --git a/storage/remote/client.go b/storage/remote/client.go new file mode 100644 index 0000000000..cca103980e --- /dev/null +++ b/storage/remote/client.go @@ -0,0 +1,201 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/golang/protobuf/proto" + "github.com/golang/snappy" + "golang.org/x/net/context" + "golang.org/x/net/context/ctxhttp" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/util/httputil" +) + +const maxErrMsgLen = 256 + +// Client allows reading and writing from/to a remote HTTP endpoint. +type Client struct { + index int // Used to differentiate metrics. + url *config.URL + client *http.Client + timeout time.Duration +} + +type clientConfig struct { + url *config.URL + timeout model.Duration + httpClientConfig config.HTTPClientConfig +} + +// NewClient creates a new Client. +func NewClient(index int, conf *clientConfig) (*Client, error) { + httpClient, err := httputil.NewClientFromConfig(conf.httpClientConfig) + if err != nil { + return nil, err + } + + return &Client{ + index: index, + url: conf.url, + client: httpClient, + timeout: time.Duration(conf.timeout), + }, nil +} + +type recoverableError struct { + error +} + +// Store sends a batch of samples to the HTTP endpoint. +func (c *Client) Store(samples model.Samples) error { + req := &prompb.WriteRequest{ + Timeseries: make([]*prompb.TimeSeries, 0, len(samples)), + } + for _, s := range samples { + ts := &prompb.TimeSeries{ + Labels: make([]*prompb.Label, 0, len(s.Metric)), + } + for k, v := range s.Metric { + ts.Labels = append(ts.Labels, + &prompb.Label{ + Name: string(k), + Value: string(v), + }) + } + ts.Samples = []*prompb.Sample{ + { + Value: float64(s.Value), + Timestamp: int64(s.Timestamp), + }, + } + req.Timeseries = append(req.Timeseries, ts) + } + + data, err := proto.Marshal(req) + if err != nil { + return err + } + + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) + if err != nil { + // Errors from NewRequest are from unparseable URLs, so are not + // recoverable. + return err + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0") + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) + if err != nil { + // Errors from client.Do are from (for example) network errors, so are + // recoverable. + return recoverableError{err} + } + defer httpResp.Body.Close() + + if httpResp.StatusCode/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) + } + if httpResp.StatusCode/100 == 5 { + return recoverableError{err} + } + return err +} + +// Name identifies the client. +func (c Client) Name() string { + return fmt.Sprintf("%d:%s", c.index, c.url) +} + +// Read reads from a remote endpoint. +func (c *Client) Read(ctx context.Context, from, through int64, matchers []*prompb.LabelMatcher) ([]*prompb.TimeSeries, error) { + req := &prompb.ReadRequest{ + // TODO: Support batching multiple queries into one read request, + // as the protobuf interface allows for it. + Queries: []*prompb.Query{{ + StartTimestampMs: from, + EndTimestampMs: through, + Matchers: matchers, + }}, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, fmt.Errorf("unable to marshal read request: %v", err) + } + + compressed := snappy.Encode(nil, data) + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) + if err != nil { + return nil, fmt.Errorf("unable to create request: %v", err) + } + httpReq.Header.Add("Content-Encoding", "snappy") + httpReq.Header.Set("Content-Type", "application/x-protobuf") + httpReq.Header.Set("X-Prometheus-Remote-Read-Version", "0.1.0") + + ctx, cancel := context.WithTimeout(ctx, c.timeout) + defer cancel() + + httpResp, err := ctxhttp.Do(ctx, c.client, httpReq) + if err != nil { + return nil, fmt.Errorf("error sending request: %v", err) + } + defer httpResp.Body.Close() + if httpResp.StatusCode/100 != 2 { + return nil, fmt.Errorf("server returned HTTP status %s", httpResp.Status) + } + + compressed, err = ioutil.ReadAll(httpResp.Body) + if err != nil { + return nil, fmt.Errorf("error reading response: %v", err) + } + + uncompressed, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, fmt.Errorf("error reading response: %v", err) + } + + var resp prompb.ReadResponse + err = proto.Unmarshal(uncompressed, &resp) + if err != nil { + return nil, fmt.Errorf("unable to unmarshal response body: %v", err) + } + + if len(resp.Results) != len(req.Queries) { + return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) + } + + return resp.Results[0].Timeseries, nil +} diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go new file mode 100644 index 0000000000..32a2ae3206 --- /dev/null +++ b/storage/remote/client_test.go @@ -0,0 +1,79 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package remote + +package remote + +import ( + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "reflect" + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +var longErrMessage = strings.Repeat("error message", maxErrMsgLen) + +func TestStoreHTTPErrorHandling(t *testing.T) { + tests := []struct { + code int + err error + }{ + { + code: 200, + err: nil, + }, + { + code: 300, + err: fmt.Errorf("server returned HTTP status 300 Multiple Choices: " + longErrMessage[:maxErrMsgLen]), + }, + { + code: 404, + err: fmt.Errorf("server returned HTTP status 404 Not Found: " + longErrMessage[:maxErrMsgLen]), + }, + { + code: 500, + err: recoverableError{fmt.Errorf("server returned HTTP status 500 Internal Server Error: " + longErrMessage[:maxErrMsgLen])}, + }, + } + + for i, test := range tests { + server := httptest.NewServer( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, longErrMessage, test.code) + }), + ) + + serverURL, err := url.Parse(server.URL) + if err != nil { + panic(err) + } + + c, err := NewClient(0, &clientConfig{ + url: &config.URL{serverURL}, + timeout: model.Duration(time.Second), + }) + + err = c.Store(nil) + if !reflect.DeepEqual(err, test.err) { + t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) + } + + server.Close() + } +} diff --git a/storage/remote/ewma.go b/storage/remote/ewma.go new file mode 100644 index 0000000000..82b6dd1014 --- /dev/null +++ b/storage/remote/ewma.go @@ -0,0 +1,68 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "sync" + "sync/atomic" + "time" +) + +// ewmaRate tracks an exponentially weighted moving average of a per-second rate. +type ewmaRate struct { + newEvents int64 + alpha float64 + interval time.Duration + lastRate float64 + init bool + mutex sync.Mutex +} + +// newEWMARate always allocates a new ewmaRate, as this guarantees the atomically +// accessed int64 will be aligned on ARM. See prometheus#2666. +func newEWMARate(alpha float64, interval time.Duration) *ewmaRate { + return &ewmaRate{ + alpha: alpha, + interval: interval, + } +} + +// rate returns the per-second rate. +func (r *ewmaRate) rate() float64 { + r.mutex.Lock() + defer r.mutex.Unlock() + return r.lastRate +} + +// tick assumes to be called every r.interval. +func (r *ewmaRate) tick() { + newEvents := atomic.LoadInt64(&r.newEvents) + atomic.AddInt64(&r.newEvents, -newEvents) + instantRate := float64(newEvents) / r.interval.Seconds() + + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.init { + r.lastRate += r.alpha * (instantRate - r.lastRate) + } else { + r.init = true + r.lastRate = instantRate + } +} + +// inc counts one event. +func (r *ewmaRate) incr(incr int64) { + atomic.AddInt64(&r.newEvents, incr) +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go new file mode 100644 index 0000000000..2dd14e06d8 --- /dev/null +++ b/storage/remote/queue_manager.go @@ -0,0 +1,514 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "math" + "sync" + "time" + + "golang.org/x/time/rate" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/log" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/relabel" +) + +// String constants for instrumentation. +const ( + namespace = "prometheus" + subsystem = "remote_storage" + queue = "queue" + + // We track samples in/out and how long pushes take using an Exponentially + // Weighted Moving Average. + ewmaWeight = 0.2 + shardUpdateDuration = 10 * time.Second + + // Allow 30% too many shards before scaling down. + shardToleranceFraction = 0.3 + + // Limit to 1 log event every 10s + logRateLimit = 0.1 + logBurst = 10 +) + +var ( + succeededSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "succeeded_samples_total", + Help: "Total number of samples successfully sent to remote storage.", + }, + []string{queue}, + ) + failedSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "failed_samples_total", + Help: "Total number of samples which failed on send to remote storage.", + }, + []string{queue}, + ) + droppedSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "dropped_samples_total", + Help: "Total number of samples which were dropped due to the queue being full.", + }, + []string{queue}, + ) + sentBatchDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "sent_batch_duration_seconds", + Help: "Duration of sample batch send calls to the remote storage.", + Buckets: prometheus.DefBuckets, + }, + []string{queue}, + ) + queueLength = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_length", + Help: "The number of processed samples queued to be sent to the remote storage.", + }, + []string{queue}, + ) + queueCapacity = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_capacity", + Help: "The capacity of the queue of samples to be sent to the remote storage.", + }, + []string{queue}, + ) + numShards = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "shards", + Help: "The number of shards used for parallel sending to the remote storage.", + }, + []string{queue}, + ) +) + +func init() { + prometheus.MustRegister(succeededSamplesTotal) + prometheus.MustRegister(failedSamplesTotal) + prometheus.MustRegister(droppedSamplesTotal) + prometheus.MustRegister(sentBatchDuration) + prometheus.MustRegister(queueLength) + prometheus.MustRegister(queueCapacity) + prometheus.MustRegister(numShards) +} + +// QueueManagerConfig is the configuration for the queue used to write to remote +// storage. +type QueueManagerConfig struct { + // Number of samples to buffer per shard before we start dropping them. + QueueCapacity int + // Max number of shards, i.e. amount of concurrency. + MaxShards int + // Maximum number of samples per send. + MaxSamplesPerSend int + // Maximum time sample will wait in buffer. + BatchSendDeadline time.Duration + // Max number of times to retry a batch on recoverable errors. + MaxRetries int + // On recoverable errors, backoff exponentially. + MinBackoff time.Duration + MaxBackoff time.Duration +} + +// defaultQueueManagerConfig is the default remote queue configuration. +var defaultQueueManagerConfig = QueueManagerConfig{ + // With a maximum of 1000 shards, assuming an average of 100ms remote write + // time and 100 samples per batch, we will be able to push 1M samples/s. + MaxShards: 1000, + MaxSamplesPerSend: 100, + + // By default, buffer 1000 batches, which at 100ms per batch is 1:40mins. At + // 1000 shards, this will buffer 100M samples total. + QueueCapacity: 100 * 1000, + BatchSendDeadline: 5 * time.Second, + + // Max number of times to retry a batch on recoverable errors. + MaxRetries: 10, + MinBackoff: 30 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, +} + +// StorageClient defines an interface for sending a batch of samples to an +// external timeseries database. +type StorageClient interface { + // Store stores the given samples in the remote storage. + Store(model.Samples) error + // Name identifies the remote storage implementation. + Name() string +} + +// QueueManager manages a queue of samples to be sent to the Storage +// indicated by the provided StorageClient. +type QueueManager struct { + cfg QueueManagerConfig + externalLabels model.LabelSet + relabelConfigs []*config.RelabelConfig + client StorageClient + queueName string + logLimiter *rate.Limiter + + shardsMtx sync.Mutex + shards *shards + numShards int + reshardChan chan int + quit chan struct{} + wg sync.WaitGroup + + samplesIn, samplesOut, samplesOutDuration *ewmaRate + integralAccumulator float64 +} + +// NewQueueManager builds a new QueueManager. +func NewQueueManager(cfg QueueManagerConfig, externalLabels model.LabelSet, relabelConfigs []*config.RelabelConfig, client StorageClient) *QueueManager { + t := &QueueManager{ + cfg: cfg, + externalLabels: externalLabels, + relabelConfigs: relabelConfigs, + client: client, + queueName: client.Name(), + + logLimiter: rate.NewLimiter(logRateLimit, logBurst), + numShards: 1, + reshardChan: make(chan int), + quit: make(chan struct{}), + + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), + } + t.shards = t.newShards(t.numShards) + numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) + queueCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.QueueCapacity)) + + return t +} + +// Append queues a sample to be sent to the remote storage. It drops the +// sample on the floor if the queue is full. +// Always returns nil. +func (t *QueueManager) Append(s *model.Sample) error { + var snew model.Sample + snew = *s + snew.Metric = s.Metric.Clone() + + for ln, lv := range t.externalLabels { + if _, ok := s.Metric[ln]; !ok { + snew.Metric[ln] = lv + } + } + + snew.Metric = model.Metric( + relabel.Process(model.LabelSet(snew.Metric), t.relabelConfigs...)) + + if snew.Metric == nil { + return nil + } + + t.shardsMtx.Lock() + enqueued := t.shards.enqueue(&snew) + t.shardsMtx.Unlock() + + if enqueued { + queueLength.WithLabelValues(t.queueName).Inc() + } else { + droppedSamplesTotal.WithLabelValues(t.queueName).Inc() + if t.logLimiter.Allow() { + log.Warn("Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.") + } + } + return nil +} + +// NeedsThrottling implements storage.SampleAppender. It will always return +// false as a remote storage drops samples on the floor if backlogging instead +// of asking for throttling. +func (*QueueManager) NeedsThrottling() bool { + return false +} + +// Start the queue manager sending samples to the remote storage. +// Does not block. +func (t *QueueManager) Start() { + t.wg.Add(2) + go t.updateShardsLoop() + go t.reshardLoop() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.start() +} + +// Stop stops sending samples to the remote storage and waits for pending +// sends to complete. +func (t *QueueManager) Stop() { + log.Infof("Stopping remote storage...") + close(t.quit) + t.wg.Wait() + + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + t.shards.stop() + log.Info("Remote storage stopped.") +} + +func (t *QueueManager) updateShardsLoop() { + defer t.wg.Done() + + ticker := time.Tick(shardUpdateDuration) + for { + select { + case <-ticker: + t.calculateDesiredShards() + case <-t.quit: + return + } + } +} + +func (t *QueueManager) calculateDesiredShards() { + t.samplesIn.tick() + t.samplesOut.tick() + t.samplesOutDuration.tick() + + // We use the number of incoming samples as a prediction of how much work we + // will need to do next iteration. We add to this any pending samples + // (received - send) so we can catch up with any backlog. We use the average + // outgoing batch latency to work out how many shards we need. + var ( + samplesIn = t.samplesIn.rate() + samplesOut = t.samplesOut.rate() + samplesPending = samplesIn - samplesOut + samplesOutDuration = t.samplesOutDuration.rate() + ) + + // We use an integral accumulator, like in a PID, to help dampen oscillation. + t.integralAccumulator = t.integralAccumulator + (samplesPending * 0.1) + + if samplesOut <= 0 { + return + } + + var ( + timePerSample = samplesOutDuration / samplesOut + desiredShards = (timePerSample * (samplesIn + samplesPending + t.integralAccumulator)) / float64(time.Second) + ) + log.Debugf("QueueManager.caclulateDesiredShards samplesIn=%f, samplesOut=%f, samplesPending=%f, desiredShards=%f", + samplesIn, samplesOut, samplesPending, desiredShards) + + // Changes in the number of shards must be greater than shardToleranceFraction. + var ( + lowerBound = float64(t.numShards) * (1. - shardToleranceFraction) + upperBound = float64(t.numShards) * (1. + shardToleranceFraction) + ) + log.Debugf("QueueManager.updateShardsLoop %f <= %f <= %f", lowerBound, desiredShards, upperBound) + if lowerBound <= desiredShards && desiredShards <= upperBound { + return + } + + numShards := int(math.Ceil(desiredShards)) + if numShards > t.cfg.MaxShards { + numShards = t.cfg.MaxShards + } else if numShards < 1 { + numShards = 1 + } + if numShards == t.numShards { + return + } + + // Resharding can take some time, and we want this loop + // to stay close to shardUpdateDuration. + select { + case t.reshardChan <- numShards: + log.Infof("Remote storage resharding from %d to %d shards.", t.numShards, numShards) + t.numShards = numShards + default: + log.Infof("Currently resharding, skipping.") + } +} + +func (t *QueueManager) reshardLoop() { + defer t.wg.Done() + + for { + select { + case numShards := <-t.reshardChan: + t.reshard(numShards) + case <-t.quit: + return + } + } +} + +func (t *QueueManager) reshard(n int) { + numShards.WithLabelValues(t.queueName).Set(float64(n)) + + t.shardsMtx.Lock() + newShards := t.newShards(n) + oldShards := t.shards + t.shards = newShards + t.shardsMtx.Unlock() + + oldShards.stop() + + // We start the newShards after we have stopped (the therefore completely + // flushed) the oldShards, to guarantee we only every deliver samples in + // order. + newShards.start() +} + +type shards struct { + qm *QueueManager + queues []chan *model.Sample + done chan struct{} + wg sync.WaitGroup +} + +func (t *QueueManager) newShards(numShards int) *shards { + queues := make([]chan *model.Sample, numShards) + for i := 0; i < numShards; i++ { + queues[i] = make(chan *model.Sample, t.cfg.QueueCapacity) + } + s := &shards{ + qm: t, + queues: queues, + done: make(chan struct{}), + } + s.wg.Add(numShards) + return s +} + +func (s *shards) len() int { + return len(s.queues) +} + +func (s *shards) start() { + for i := 0; i < len(s.queues); i++ { + go s.runShard(i) + } +} + +func (s *shards) stop() { + for _, shard := range s.queues { + close(shard) + } + s.wg.Wait() +} + +func (s *shards) enqueue(sample *model.Sample) bool { + s.qm.samplesIn.incr(1) + + fp := sample.Metric.FastFingerprint() + shard := uint64(fp) % uint64(len(s.queues)) + + select { + case s.queues[shard] <- sample: + return true + default: + return false + } +} + +func (s *shards) runShard(i int) { + defer s.wg.Done() + queue := s.queues[i] + + // Send batches of at most MaxSamplesPerSend samples to the remote storage. + // If we have fewer samples than that, flush them out after a deadline + // anyways. + pendingSamples := model.Samples{} + + for { + select { + case sample, ok := <-queue: + if !ok { + if len(pendingSamples) > 0 { + log.Debugf("Flushing %d samples to remote storage...", len(pendingSamples)) + s.sendSamples(pendingSamples) + log.Debugf("Done flushing.") + } + return + } + + queueLength.WithLabelValues(s.qm.queueName).Dec() + pendingSamples = append(pendingSamples, sample) + + for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { + s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) + pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] + } + case <-time.After(s.qm.cfg.BatchSendDeadline): + if len(pendingSamples) > 0 { + s.sendSamples(pendingSamples) + pendingSamples = pendingSamples[:0] + } + } + } +} + +func (s *shards) sendSamples(samples model.Samples) { + begin := time.Now() + s.sendSamplesWithBackoff(samples) + + // These counters are used to caclulate the dynamic sharding, and as such + // should be maintained irrespective of success or failure. + s.qm.samplesOut.incr(int64(len(samples))) + s.qm.samplesOutDuration.incr(int64(time.Since(begin))) +} + +// sendSamples to the remote storage with backoff for recoverable errors. +func (s *shards) sendSamplesWithBackoff(samples model.Samples) { + backoff := s.qm.cfg.MinBackoff + for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { + begin := time.Now() + err := s.qm.client.Store(samples) + + sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) + if err == nil { + succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + return + } + + log.Warnf("Error sending %d samples to remote storage: %s", len(samples), err) + if _, ok := err.(recoverableError); !ok { + break + } + time.Sleep(backoff) + backoff = backoff * 2 + if backoff > s.qm.cfg.MaxBackoff { + backoff = s.qm.cfg.MaxBackoff + } + } + + failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) +} diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go new file mode 100644 index 0000000000..c97c00714e --- /dev/null +++ b/storage/remote/queue_manager_test.go @@ -0,0 +1,253 @@ +// Copyright 2013 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/prometheus/common/model" +) + +type TestStorageClient struct { + receivedSamples map[string]model.Samples + expectedSamples map[string]model.Samples + wg sync.WaitGroup + mtx sync.Mutex +} + +func NewTestStorageClient() *TestStorageClient { + return &TestStorageClient{ + receivedSamples: map[string]model.Samples{}, + expectedSamples: map[string]model.Samples{}, + } +} + +func (c *TestStorageClient) expectSamples(ss model.Samples) { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.expectedSamples[ts] = append(c.expectedSamples[ts], s) + } + c.wg.Add(len(ss)) +} + +func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { + c.wg.Wait() + + c.mtx.Lock() + defer c.mtx.Unlock() + for ts, expectedSamples := range c.expectedSamples { + for i, expected := range expectedSamples { + if !expected.Equal(c.receivedSamples[ts][i]) { + t.Fatalf("%d. Expected %v, got %v", i, expected, c.receivedSamples[ts][i]) + } + } + } +} + +func (c *TestStorageClient) Store(ss model.Samples) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + for _, s := range ss { + ts := s.Metric.String() + c.receivedSamples[ts] = append(c.receivedSamples[ts], s) + } + c.wg.Add(-len(ss)) + return nil +} + +func (c *TestStorageClient) Name() string { + return "teststorageclient" +} + +func TestSampleDelivery(t *testing.T) { + // Let's create an even number of send batches so we don't run into the + // batch timeout case. + n := defaultQueueManagerConfig.QueueCapacity * 2 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples[:len(samples)/2]) + + cfg := defaultQueueManagerConfig + cfg.MaxShards = 1 + m := NewQueueManager(cfg, nil, nil, c) + + // These should be received by the client. + for _, s := range samples[:len(samples)/2] { + m.Append(s) + } + // These will be dropped because the queue is full. + for _, s := range samples[len(samples)/2:] { + m.Append(s) + } + m.Start() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + +func TestSampleDeliveryOrder(t *testing.T) { + ts := 10 + n := defaultQueueManagerConfig.MaxSamplesPerSend * ts + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + Timestamp: model.Time(i), + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples) + m := NewQueueManager(defaultQueueManagerConfig, nil, nil, c) + + // These should be received by the client. + for _, s := range samples { + m.Append(s) + } + m.Start() + defer m.Stop() + + c.waitForExpectedSamples(t) +} + +// TestBlockingStorageClient is a queue_manager StorageClient which will block +// on any calls to Store(), until the `block` channel is closed, at which point +// the `numCalls` property will contain a count of how many times Store() was +// called. +type TestBlockingStorageClient struct { + numCalls uint64 + block chan bool +} + +func NewTestBlockedStorageClient() *TestBlockingStorageClient { + return &TestBlockingStorageClient{ + block: make(chan bool), + numCalls: 0, + } +} + +func (c *TestBlockingStorageClient) Store(s model.Samples) error { + atomic.AddUint64(&c.numCalls, 1) + <-c.block + return nil +} + +func (c *TestBlockingStorageClient) NumCalls() uint64 { + return atomic.LoadUint64(&c.numCalls) +} + +func (c *TestBlockingStorageClient) unlock() { + close(c.block) +} + +func (c *TestBlockingStorageClient) Name() string { + return "testblockingstorageclient" +} + +func (t *QueueManager) queueLen() int { + t.shardsMtx.Lock() + defer t.shardsMtx.Unlock() + queueLength := 0 + for _, shard := range t.shards.queues { + queueLength += len(shard) + } + return queueLength +} + +func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { + // Our goal is to fully empty the queue: + // `MaxSamplesPerSend*Shards` samples should be consumed by the + // per-shard goroutines, and then another `MaxSamplesPerSend` + // should be left on the queue. + n := defaultQueueManagerConfig.MaxSamplesPerSend * 2 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestBlockedStorageClient() + cfg := defaultQueueManagerConfig + cfg.MaxShards = 1 + cfg.QueueCapacity = n + m := NewQueueManager(cfg, nil, nil, c) + + m.Start() + + defer func() { + c.unlock() + m.Stop() + }() + + for _, s := range samples { + m.Append(s) + } + + // Wait until the runShard() loops drain the queue. If things went right, it + // should then immediately block in sendSamples(), but, in case of error, + // it would spawn too many goroutines, and thus we'd see more calls to + // client.Store() + // + // The timed wait is maybe non-ideal, but, in order to verify that we're + // not spawning too many concurrent goroutines, we have to wait on the + // Run() loop to consume a specific number of elements from the + // queue... and it doesn't signal that in any obvious way, except by + // draining the queue. We cap the waiting at 1 second -- that should give + // plenty of time, and keeps the failure fairly quick if we're not draining + // the queue properly. + for i := 0; i < 100 && m.queueLen() > 0; i++ { + time.Sleep(10 * time.Millisecond) + } + + if m.queueLen() != defaultQueueManagerConfig.MaxSamplesPerSend { + t.Fatalf("Failed to drain QueueManager queue, %d elements left", + m.queueLen(), + ) + } + + numCalls := c.NumCalls() + if numCalls != uint64(1) { + t.Errorf("Saw %d concurrent sends, expected 1", numCalls) + } +} diff --git a/storage/remote/read.go b/storage/remote/read.go new file mode 100644 index 0000000000..81d0776636 --- /dev/null +++ b/storage/remote/read.go @@ -0,0 +1,247 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "sort" + + "github.com/prometheus/common/model" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/storage" +) + +// Querier returns a new Querier on the storage. +func (r *Storage) Querier(mint, maxt int64) (storage.Querier, error) { + r.mtx.Lock() + defer r.mtx.Unlock() + + queriers := make([]storage.Querier, 0, len(r.clients)) + for _, c := range r.clients { + queriers = append(queriers, &querier{ + mint: mint, + maxt: maxt, + client: c, + externalLabels: r.externalLabels, + }) + } + return storage.NewMergeQuerier(queriers), nil +} + +// Querier is an adapter to make a Client usable as a storage.Querier. +type querier struct { + mint, maxt int64 + client *Client + externalLabels model.LabelSet +} + +// Select returns a set of series that matches the given label matchers. +func (q *querier) Select(matchers ...*labels.Matcher) storage.SeriesSet { + m, added := q.addExternalLabels(matchers) + + res, err := q.client.Read(context.TODO(), q.mint, q.maxt, labelMatchersToProto(m)) + if err != nil { + return errSeriesSet{err: err} + } + + series := make([]storage.Series, 0, len(res)) + for _, ts := range res { + labels := labelPairsToLabels(ts.Labels) + removeLabels(labels, added) + series = append(series, &concreteSeries{ + labels: labels, + samples: ts.Samples, + }) + } + sort.Sort(byLabel(series)) + return &concreteSeriesSet{ + series: series, + } +} + +func labelMatchersToProto(matchers []*labels.Matcher) []*prompb.LabelMatcher { + pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers)) + for _, m := range matchers { + var mType prompb.LabelMatcher_Type + switch m.Type { + case labels.MatchEqual: + mType = prompb.LabelMatcher_EQ + case labels.MatchNotEqual: + mType = prompb.LabelMatcher_NEQ + case labels.MatchRegexp: + mType = prompb.LabelMatcher_RE + case labels.MatchNotRegexp: + mType = prompb.LabelMatcher_NRE + default: + panic("invalid matcher type") + } + pbMatchers = append(pbMatchers, &prompb.LabelMatcher{ + Type: mType, + Name: string(m.Name), + Value: string(m.Value), + }) + } + return pbMatchers +} + +func labelPairsToLabels(labelPairs []*prompb.Label) labels.Labels { + result := make(labels.Labels, 0, len(labelPairs)) + for _, l := range labelPairs { + result = append(result, labels.Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Sort(result) + return result +} + +type byLabel []storage.Series + +func (a byLabel) Len() int { return len(a) } +func (a byLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 } + +// LabelValues returns all potential values for a label name. +func (q *querier) LabelValues(name string) ([]string, error) { + // TODO implement? + return nil, nil +} + +// Close releases the resources of the Querier. +func (q *querier) Close() error { + return nil +} + +// errSeriesSet implements storage.SeriesSet, just returning an error. +type errSeriesSet struct { + err error +} + +func (errSeriesSet) Next() bool { + return false +} + +func (errSeriesSet) At() storage.Series { + return nil +} + +func (e errSeriesSet) Err() error { + return e.err +} + +// concreteSeriesSet implements storage.SeriesSet. +type concreteSeriesSet struct { + cur int + series []storage.Series +} + +func (c *concreteSeriesSet) Next() bool { + c.cur++ + return c.cur < len(c.series) +} + +func (c *concreteSeriesSet) At() storage.Series { + return c.series[c.cur] +} + +func (c *concreteSeriesSet) Err() error { + return nil +} + +// concreteSeries implementes storage.Series. +type concreteSeries struct { + labels labels.Labels + samples []*prompb.Sample +} + +func (c *concreteSeries) Labels() labels.Labels { + return c.labels +} + +func (c *concreteSeries) Iterator() storage.SeriesIterator { + return newConcreteSeriersIterator(c) +} + +// concreteSeriesIterator implements storage.SeriesIterator. +type concreteSeriesIterator struct { + cur int + series *concreteSeries +} + +func newConcreteSeriersIterator(series *concreteSeries) storage.SeriesIterator { + return &concreteSeriesIterator{ + cur: -1, + series: series, + } +} + +func (c *concreteSeriesIterator) Seek(t int64) bool { + c.cur = sort.Search(len(c.series.samples), func(n int) bool { + return c.series.samples[n].Timestamp >= t + }) + return c.cur < len(c.series.samples) +} + +func (c *concreteSeriesIterator) At() (t int64, v float64) { + s := c.series.samples[c.cur] + return s.Timestamp, s.Value +} + +func (c *concreteSeriesIterator) Next() bool { + c.cur++ + return c.cur < len(c.series.samples) +} + +func (c *concreteSeriesIterator) Err() error { + return nil +} + +// addExternalLabels adds matchers for each external label. External labels +// that already have a corresponding user-supplied matcher are skipped, as we +// assume that the user explicitly wants to select a different value for them. +// We return the new set of matchers, along with a map of labels for which +// matchers were added, so that these can later be removed from the result +// time series again. +func (q *querier) addExternalLabels(matchers []*labels.Matcher) ([]*labels.Matcher, model.LabelSet) { + el := make(model.LabelSet, len(q.externalLabels)) + for k, v := range q.externalLabels { + el[k] = v + } + for _, m := range matchers { + if _, ok := el[model.LabelName(m.Name)]; ok { + delete(el, model.LabelName(m.Name)) + } + } + for k, v := range el { + m, err := labels.NewMatcher(labels.MatchEqual, string(k), string(v)) + if err != nil { + panic(err) + } + matchers = append(matchers, m) + } + return matchers, el +} + +func removeLabels(l labels.Labels, toDelete model.LabelSet) { + for i := 0; i < len(l); { + if _, ok := toDelete[model.LabelName(l[i].Name)]; ok { + l = l[:i+copy(l[i:], l[i+1:])] + } else { + i++ + } + } +} diff --git a/storage/remote/read_test.go b/storage/remote/read_test.go new file mode 100644 index 0000000000..596ced5616 --- /dev/null +++ b/storage/remote/read_test.go @@ -0,0 +1,94 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "reflect" + "sort" + "testing" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" +) + +func mustNewLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher { + m, err := labels.NewMatcher(mt, name, val) + if err != nil { + panic(err) + } + return m +} + +func TestAddExternalLabels(t *testing.T) { + tests := []struct { + el model.LabelSet + inMatchers []*labels.Matcher + outMatchers []*labels.Matcher + added model.LabelSet + }{ + { + el: model.LabelSet{}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + added: model.LabelSet{}, + }, + { + el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "berlin-01"), + }, + added: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + }, + { + el: model.LabelSet{"region": "europe", "dc": "berlin-01"}, + inMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), + }, + outMatchers: []*labels.Matcher{ + mustNewLabelMatcher(labels.MatchEqual, "job", "api-server"), + mustNewLabelMatcher(labels.MatchEqual, "region", "europe"), + mustNewLabelMatcher(labels.MatchEqual, "dc", "munich-02"), + }, + added: model.LabelSet{"region": "europe"}, + }, + } + + for i, test := range tests { + q := querier{ + externalLabels: test.el, + } + + matchers, added := q.addExternalLabels(test.inMatchers) + + sort.Slice(test.outMatchers, func(i, j int) bool { return test.outMatchers[i].Name < test.outMatchers[j].Name }) + sort.Slice(matchers, func(i, j int) bool { return matchers[i].Name < matchers[j].Name }) + + if !reflect.DeepEqual(matchers, test.outMatchers) { + t.Fatalf("%d. unexpected matchers; want %v, got %v", i, test.outMatchers, matchers) + } + if !reflect.DeepEqual(added, test.added) { + t.Fatalf("%d. unexpected added labels; want %v, got %v", i, test.added, added) + } + } +} diff --git a/storage/remote/storage.go b/storage/remote/storage.go new file mode 100644 index 0000000000..af4aa4f52f --- /dev/null +++ b/storage/remote/storage.go @@ -0,0 +1,103 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package remote + +package remote + +import ( + "sync" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/config" +) + +// Storage represents all the remote read and write endpoints. It implements +// storage.Storage. +type Storage struct { + mtx sync.RWMutex + + // For writes + queues []*QueueManager + + // For reads + clients []*Client + externalLabels model.LabelSet +} + +// ApplyConfig updates the state as the new config requires. +func (s *Storage) ApplyConfig(conf *config.Config) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + // Update write queues + + newQueues := []*QueueManager{} + // TODO: we should only stop & recreate queues which have changes, + // as this can be quite disruptive. + for i, rwConf := range conf.RemoteWriteConfigs { + c, err := NewClient(i, &clientConfig{ + url: rwConf.URL, + timeout: rwConf.RemoteTimeout, + httpClientConfig: rwConf.HTTPClientConfig, + }) + if err != nil { + return err + } + newQueues = append(newQueues, NewQueueManager( + defaultQueueManagerConfig, + conf.GlobalConfig.ExternalLabels, + rwConf.WriteRelabelConfigs, + c, + )) + } + + for _, q := range s.queues { + q.Stop() + } + + s.queues = newQueues + for _, q := range s.queues { + q.Start() + } + + // Update read clients + + clients := []*Client{} + for i, rrConf := range conf.RemoteReadConfigs { + c, err := NewClient(i, &clientConfig{ + url: rrConf.URL, + timeout: rrConf.RemoteTimeout, + httpClientConfig: rrConf.HTTPClientConfig, + }) + if err != nil { + return err + } + clients = append(clients, c) + } + + s.clients = clients + s.externalLabels = conf.GlobalConfig.ExternalLabels + + return nil +} + +// Close the background processing of the storage queues. +func (s *Storage) Close() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + for _, q := range s.queues { + q.Stop() + } + + return nil +} diff --git a/storage/remote/write.go b/storage/remote/write.go new file mode 100644 index 0000000000..087ef8ca49 --- /dev/null +++ b/storage/remote/write.go @@ -0,0 +1,58 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" +) + +func (s *Storage) Appender() (storage.Appender, error) { + return s, nil +} + +func (s *Storage) Add(l labels.Labels, t int64, v float64) (string, error) { + s.mtx.RLock() + defer s.mtx.RUnlock() + for _, q := range s.queues { + q.Append(&model.Sample{ + Metric: labelsToMetric(l), + Timestamp: model.Time(t), + Value: model.SampleValue(v), + }) + } + return "", nil +} + +func labelsToMetric(ls labels.Labels) model.Metric { + metric := make(model.Metric, len(ls)) + for _, l := range ls { + metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) + } + return metric +} + +func (s *Storage) AddFast(l labels.Labels, _ string, t int64, v float64) error { + _, err := s.Add(l, t, v) + return err +} + +func (*Storage) Commit() error { + return nil +} + +func (*Storage) Rollback() error { + return nil +} diff --git a/storage/tsdb/tsdb.go b/storage/tsdb/tsdb.go index 09021fc5c0..63c4d52190 100644 --- a/storage/tsdb/tsdb.go +++ b/storage/tsdb/tsdb.go @@ -135,7 +135,7 @@ func (a appender) Add(lset labels.Labels, t int64, v float64) (string, error) { return ref, err } -func (a appender) AddFast(ref string, t int64, v float64) error { +func (a appender) AddFast(_ labels.Labels, ref string, t int64, v float64) error { err := a.a.AddFast(ref, t, v) switch errors.Cause(err) { diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 0000000000..6a66aea5ea --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 0000000000..733099041f --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 0000000000..eabcd11474 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,380 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +type Limiter struct { + limit Limit + burst int + + mu sync.Mutex + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + return lim.burst +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + } +} + +// Allow is shorthand for AllowN(time.Now(), 1). +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time now. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(now time.Time, n int) bool { + return lim.reserveN(now, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(1<<63 - 1) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(now time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(now) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) + return +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(now time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + now, _, tokens := r.lim.advance(now) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = now + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(now) { + r.lim.lastEvent = prevEvent + } + } + + return +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// ReserveN returns false if n exceeds the Limiter's burst size. +// Usage example: +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { + r := lim.reserveN(now, n, InfDuration) + return &r +} + +// contextContext is a temporary(?) copy of the context.Context type +// to support both Go 1.6 using golang.org/x/net/context and Go 1.7+ +// with the built-in context package. If people ever stop using Go 1.6 +// we can remove this. +type contextContext interface { + Deadline() (deadline time.Time, ok bool) + Done() <-chan struct{} + Err() error + Value(key interface{}) interface{} +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) wait(ctx contextContext) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) waitN(ctx contextContext, n int) (err error) { + if n > lim.burst && lim.limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + now := time.Now() + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(now) + } + // Reserve + r := lim.reserveN(now, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait + t := time.NewTimer(r.DelayFrom(now)) + defer t.Stop() + select { + case <-t.C: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + now, _, tokens := lim.advance(now) + + lim.last = now + lim.tokens = tokens + lim.limit = newLimit +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + + if lim.limit == Inf { + lim.mu.Unlock() + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: now, + } + } + + now, last, tokens := lim.advance(now) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = now.Add(waitDuration) + } + + // Update state + if ok { + lim.last = now + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } else { + lim.last = last + } + + lim.mu.Unlock() + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { + last := lim.last + if now.Before(last) { + last = now + } + + // Avoid making delta overflow below when last is very old. + maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) + elapsed := now.Sub(last) + if elapsed > maxElapsed { + elapsed = maxElapsed + } + + // Calculate the new number of tokens, due to time that passed. + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + + return now, last, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + seconds := tokens / float64(limit) + return time.Nanosecond * time.Duration(1e9*seconds) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + return d.Seconds() * float64(limit) +} diff --git a/vendor/golang.org/x/time/rate/rate_go16.go b/vendor/golang.org/x/time/rate/rate_go16.go new file mode 100644 index 0000000000..6bab1850f8 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go16.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !go1.7 + +package rate + +import "golang.org/x/net/context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/golang.org/x/time/rate/rate_go17.go b/vendor/golang.org/x/time/rate/rate_go17.go new file mode 100644 index 0000000000..f90d85f51e --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate_go17.go @@ -0,0 +1,21 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build go1.7 + +package rate + +import "context" + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.waitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + return lim.waitN(ctx, n) +} diff --git a/vendor/vendor.json b/vendor/vendor.json index 8a96fd238b..7ac9a5eb9a 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -1097,6 +1097,12 @@ "revision": "c589d0c9f0d81640c518354c7bcae77d99820aa3", "revisionTime": "2016-09-30T00:14:02Z" }, + { + "checksumSHA1": "vGfePfr0+weQUeTM/71mu+LCFuE=", + "path": "golang.org/x/time/rate", + "revision": "8be79e1e0910c292df4e79c241bb7e8f7e725959", + "revisionTime": "2017-04-24T23:28:54Z" + }, { "checksumSHA1": "AjdmRXf0fiy6Bec9mNlsGsmZi1k=", "path": "google.golang.org/api/compute/v1",