diff --git a/coding/general.go b/coding/interface.go similarity index 100% rename from coding/general.go rename to coding/interface.go diff --git a/configuration/grammar/example.rules b/configuration/grammar/example.rules deleted file mode 100644 index 6dc1bce03a..0000000000 --- a/configuration/grammar/example.rules +++ /dev/null @@ -1,61 +0,0 @@ -// { -// set evaluation_interval = "30s"; -// target "http://www.example.com:80/metrics" - -// rule archived {name="instance:requests_total:sum"} = sum by (instance) {name="requests"}; -// rule archived {name="instance:requests-by_result_code_total:sum"} = -// sum by (instance,result_code) {name="requests"}; -// rule archived {name="instance:requests-by_result_code:sum"} = -// {name="instances:requests-by_result_code"} -// / by (instance) -// {name="instances:requests_total:sum"}; -// } - -{ - set evaluation_interval = “2m”; - - permanent { - rule { - labels { - set name = “process:request_rate_qps-allowed:sum”; - set job = “frontend”; - } - - // Values may be a literal or an expression. - set value = 500; - - // I wonder: Is it practical to express labels similar to above in a better DSL? - // set value = EXPRESSION … WITH LABELS {foo=”bar”}; - } - } - - rule { - // This provides a way of overriding existing labels, unsetting them, or - // appending new ones. - labels { - // “name” is obligatory. - set name = “process:requests_total:sum”; - } - - // Here we are extracting a metric with the name label value of “requests” from - // job “frontend” and merely accumulating this into a named variable. It is - // similar to standing. Each sum is keyed to the UNIX process and job from which - // it came. - set value = SUM({name=”requests”, job=”frontend”}) BY (process, job); - } - - rule { - // This provides a way of overriding existing labels, unsetting them, or - // appending new ones. - labels { - // “name” is obligatory. - set name = “process:request_qps:rate5m”; - } - - // Here we are extracting a metric with the name label value of “requests” from - // job “frontend” and merely accumulating this into a named variable. It is - // similar to standing. - set value = RATE({name=”process:requests_total:sum”, job=”frontend”} OVER “5m”); - } - } -} \ No newline at end of file diff --git a/configuration/grammar/lex.go b/configuration/grammar/lex.go deleted file mode 100644 index 430ac36023..0000000000 --- a/configuration/grammar/lex.go +++ /dev/null @@ -1,506 +0,0 @@ -package main - -import ( - "fmt" - "strings" - "unicode" - "unicode/utf8" -) - -type itemType int - -const ( - itemError itemType = iota - itemEof - itemRuleContextOpen - itemRuleContextClose - itemSetKeyword - itemKey - itemEqual - itemValue - itemSemicolon - itemQuote - itemRulesKeyword - itemRuleKeyword - itemRuleName - itemRuleValue - itemOpenBracket - itemCloseBracket -) - -const ( - literalCloseBracket = "}" - literalEof = -1 - literalOpenBracket = "{" - literalRule = "rule" - literalRules = "rules" - literalSet = "set" - literalEqual = "=" - literalQuote = `"` - literalSemicolon = ";" -) - -type element struct { - itemType itemType - value string -} - -func (e element) String() string { - switch e.itemType { - case itemError: - return e.value - case itemEof: - return "EOF" - } - return fmt.Sprintf("%s %q", e.itemType, e.value) -} - -type lexer struct { - input string - elements chan element - start int - position int - runeWidth int -} - -func lex(name, input string) (*lexer, chan element) { - l := &lexer{ - input: input, - elements: make(chan element), - } - go l.run() - return l, l.elements -} - -func (l *lexer) run() { - for state := lexBody; state != nil; { - state = state(l) - } - close(l.elements) -} - -func (l *lexer) next() (rune rune) { - if l.position >= len(l.input) { - l.runeWidth = 0 - return literalEof - } - - rune, l.runeWidth = utf8.DecodeRuneInString(l.input[l.position:]) - l.position += l.runeWidth - return rune -} - -func lexBody(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalOpenBracket) { - return lexRulesetOpen - } - - switch rune := l.next(); { - case rune == literalEof: - l.emit(itemEof) - return nil - case isSpace(rune): - l.ignore() - default: - return l.errorf("illegal input") - } - return lexBody -} - -func lexRulesetOpen(l *lexer) lexFunction { - l.position += len(literalOpenBracket) - l.emit(itemRuleContextOpen) - - return lexRulesetInside -} - -func lexRulesetInside(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalCloseBracket) { - return lexRulesetClose - } - - if strings.HasPrefix(l.input[l.position:], literalSet) { - return lexRuleSetKeyword - } - - if strings.HasPrefix(l.input[l.position:], literalRules) { - return lexRulesetRules - } - - switch rune := l.next(); { - case rune == literalEof: - return l.errorf("unterminated ruleset") - case isSpace(rune): - l.ignore() - case rune == ';': - l.ignore() - default: - return l.errorf("unrecognized input") - } - return lexRulesetInside -} - -func lexRulesetRules(l *lexer) lexFunction { - l.position += len(literalRules) - l.emit(itemRulesKeyword) - - return lexRulesetRulesBlockOpen -} - -func lexRulesetRulesBlockOpen(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalOpenBracket) { - l.position += len(literalOpenBracket) - l.emit(itemOpenBracket) - return lexRulesetRulesBlockInside - } - - switch rune := l.next(); { - case isSpace(rune): - l.ignore() - default: - return l.errorf("unrecognized input") - } - - return lexRulesetRulesBlockOpen -} - -func lexRulesetRulesBlockInside(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalRule) { - return lexRulesetRuleBegin - } - - if strings.HasPrefix(l.input[l.position:], literalCloseBracket) { - return lexRulesetRulesBlockClose - } - - switch rune := l.next(); { - case isSpace(rune): - l.ignore() - default: - return l.errorf("unrecognized input") - } - - return lexRulesetRulesBlockInside -} - -func lexRulesetRulesBlockClose(l *lexer) lexFunction { - l.position += len(literalCloseBracket) - l.emit(itemCloseBracket) - - return lexRulesetInside -} - -func lexRulesetRuleBegin(l *lexer) lexFunction { - l.position += len(literalRule) - l.emit(itemRuleKeyword) - - return lexRulesetRuleName -} - -func lexRulesetRuleName(l *lexer) lexFunction { - - switch rune := l.next(); { - case isSpace(rune): - l.ignore() - case isIdentifierOpen(rune): - for { - switch rune := l.next(); { - case isMetricIdentifier(rune): - case rune == '=': - l.backup() - l.emit(itemRuleName) - return lexRulesetRuleEqual - default: - return l.errorf("bad rule name") - } - } - default: - return l.errorf("unrecognized input") - } - - return lexRulesetRuleName -} - -func lexRulesetRuleEqual(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalEqual) { - l.position += len(literalEqual) - l.emit(itemEqual) - return lexRulesetRuleDefinitionBegin - } - - switch rune := l.next(); { - case isSpace(rune): - l.ignore() - default: - return l.errorf("unrecognized input") - } - - return lexRulesetRuleEqual -} - -func lexRulesetRuleDefinitionBegin(l *lexer) lexFunction { - switch rune := l.next(); { - case isSpace(rune): - l.ignore() - case isIdentifierOpen(rune): - for { - switch rune := l.next(); { - case isMetricIdentifier(rune): - case rune == ';': - l.emit(itemRuleValue) - return lexRulesetRulesBlockInside - default: - return l.errorf("unrecognized input") - } - } - default: - return l.errorf("unrecognized input") - } - - return lexRulesetRuleDefinitionBegin -} - -func lexRuleSetKeyword(l *lexer) lexFunction { - l.position += len(literalSet) - - l.emit(itemSetKeyword) - - return lexRuleSetInside -} - -func (l *lexer) backup() { - l.position -= l.runeWidth -} - -func isIdentifierOpen(rune rune) bool { - switch rune := rune; { - case unicode.IsLetter(rune): - return true - case rune == '_': - return true - } - - return false -} - -func lexRuleSetInside(l *lexer) lexFunction { - switch rune := l.next(); { - case rune == literalEof: - return l.errorf("unterminated set statement") - case isSpace(rune): - l.ignore() - case rune == ';': - return l.errorf("unexpected ;") - case rune == '=': - return l.errorf("unexpected =") - case isIdentifierOpen(rune): - l.backup() - return lexRuleSetKey - default: - return l.errorf("unrecognized input") - } - - return lexRuleSetInside -} - -func isIdentifier(rune rune) bool { - switch rune := rune; { - case isIdentifierOpen(rune): - return true - case unicode.IsDigit(rune): - return true - } - return false -} - -func isMetricIdentifier(rune rune) bool { - switch rune := rune; { - case isIdentifier(rune): - return true - case rune == ':': - return true - } - - return false -} - -func (l *lexer) peek() rune { - rune := l.next() - l.backup() - return rune -} - -func (l *lexer) atTerminator() bool { - switch rune := l.peek(); { - case isSpace(rune): - return true - case rune == ';': - return true - } - - return false -} - -func lexRuleSetKey(l *lexer) lexFunction { - switch rune := l.next(); { - case rune == literalEof: - return l.errorf("incomplete set statement") - case isIdentifier(rune): - default: - l.backup() - if !l.atTerminator() { - return l.errorf("unexpected character %+U %q", rune, rune) - } - l.emit(itemKey) - return lexRuleSetEqual - } - return lexRuleSetKey -} - -func lexRuleSetEqual(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalEqual) { - l.position += len(literalEqual) - l.emit(itemEqual) - return lexRuleSetValueOpenQuote - } - - switch rune := l.next(); { - case rune == literalEof: - return l.errorf("incomplete set statement") - case isSpace(rune): - l.ignore() - default: - return l.errorf("unexpected character %+U %q", rune, rune) - } - return lexRuleSetEqual -} - -func lexRuleSetValueOpenQuote(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalQuote) { - l.position += len(literalQuote) - l.emit(itemQuote) - - return lexRuleSetValue - } - - switch rune := l.next(); { - case rune == literalEof: - return l.errorf("incomplete set statement") - case isSpace(rune): - l.ignore() - default: - return l.errorf("unexpected character %+U %q", rune, rune) - } - return lexRuleSetValueOpenQuote -} - -func lexRuleSetValue(l *lexer) lexFunction { - var lastRuneEscapes bool = false - - for { - rune := l.next() - { - if rune == '"' && !lastRuneEscapes { - l.backup() - l.emit(itemValue) - return lexRuleSetValueCloseQuote - } - - if !lastRuneEscapes && rune == '\\' { - lastRuneEscapes = true - } else { - lastRuneEscapes = false - } - } - } - - panic("unreachable") -} - -func lexRuleSetValueCloseQuote(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalQuote) { - l.position += len(literalQuote) - l.emit(itemQuote) - - return lexRuleSetSemicolon - } - - switch rune := l.next(); { - case isSpace(rune): - l.ignore() - default: - return l.errorf("unexpected character %+U %q", rune, rune) - - } - return lexRuleSetValueCloseQuote - -} - -func lexRuleSetSemicolon(l *lexer) lexFunction { - if strings.HasPrefix(l.input[l.position:], literalSemicolon) { - l.position += len(literalSemicolon) - l.emit(itemSemicolon) - return lexRulesetInside - } - - switch rune := l.next(); { - case isSpace(rune): - l.ignore() - default: - return l.errorf("unexpected character %+U %q", rune, rune) - } - return lexRuleSetSemicolon -} - -func (l *lexer) ignore() { - l.start = l.position -} - -func (l *lexer) errorf(format string, args ...interface{}) lexFunction { - l.elements <- element{itemError, fmt.Sprintf(format, args...)} - return nil -} - -func isSpace(rune rune) bool { - switch rune { - case ' ', '\t', '\n', '\r': - return true - } - return false -} - -func lexRulesetClose(l *lexer) lexFunction { - l.position += len(literalCloseBracket) - l.emit(itemCloseBracket) - - return lexBody -} - -func (l *lexer) emit(i itemType) { - l.elements <- element{i, l.input[l.start:l.position]} - l.start = l.position -} - -type lexFunction func(*lexer) lexFunction - -func main() { - in := `{ - set evaluation_interval = "10m"; - - rules { - } - - - set name = "your mom"; - - - } - { - set evaluation_interval = "30m"; - }` - fmt.Println(in) - _, v := lex("", in) - for value := range v { - fmt.Println(value) - } -} diff --git a/configuration/grammar/lex_test.goold b/configuration/grammar/lex_test.goold deleted file mode 100644 index c2b1ac4e69..0000000000 --- a/configuration/grammar/lex_test.goold +++ /dev/null @@ -1,192 +0,0 @@ -package main - -import ( - "testing" -) - -type lexTest struct { - name string - input string - elements []element -} - -var ( - tEof = element{itemEof, ""} - tOpenRuleset = element{itemOpenBracket, "{"} - tCloseBracket = element{itemCloseBracket, "}"} - tIllegal = element{itemError, "illegal input"} - tSet = element{itemSetKeyword, "set"} - tEqual = element{itemEqual, "="} - tQuote = element{itemQuote, `"`} - tSemicolon = element{itemSemicolon, ";"} - tRules = element{itemRulesKeyword, "rules"} -) - -var lexTests = []lexTest{ - { - "empty", - "", - []element{ - tEof, - }, - }, - { - "empty with new line", - "\n\n", - []element{ - tEof, - }, - }, - { - "one empty ruleset", - "{}", - []element{ - tOpenRuleset, - tCloseBracket, - tEof, - }, - }, - { - "one empty ruleset distributed over new line", - "{\n}", - []element{ - tOpenRuleset, - tCloseBracket, - tEof, - }, - }, - { - "two empty rulesets", - "{} {}", - []element{ - tOpenRuleset, - tCloseBracket, - tOpenRuleset, - tCloseBracket, - tEof, - }, - }, - { - "two empty rulesets distributed over new line", - "{}\n{}", - []element{ - tOpenRuleset, - tCloseBracket, - tOpenRuleset, - tCloseBracket, - tEof, - }, - }, - { - "garbage", - "garbage", - []element{tIllegal}, - }, - { - "one set", - `{ set foo = "bar"; }`, - []element{ - tOpenRuleset, - tSet, - element{itemKey, "foo"}, - tEqual, - tQuote, - element{itemValue, "bar"}, - tQuote, - tSemicolon, - tCloseBracket, - tEof, - }, - }, - { - "one set over multiple lines", - `{ - set - foo - = - "bar" - ; - }`, - []element{tOpenRuleset, tSet, element{itemKey, "foo"}, tEqual, tQuote, element{itemValue, "bar"}, tQuote, tSemicolon, tCloseBracket, tEof}, - }, - { - "two sets", - `{ set foo = "bar";set baz = "qux"; }`, - []element{ - tOpenRuleset, - tSet, - element{itemKey, "foo"}, - tEqual, - tQuote, - element{itemValue, "bar"}, - tQuote, - tSemicolon, - tSet, - element{itemKey, "baz"}, - tEqual, - tQuote, - element{itemValue, "qux"}, - tQuote, - tSemicolon, - tCloseBracket, - tEof, - }, - }, - { - "two over multiple lines", - `{ set foo = "bar"; - set -baz -= -"qux" -; -}`, - []element{ - tOpenRuleset, - tSet, - element{itemKey, "foo"}, - tEqual, - tQuote, - element{itemValue, "bar"}, - tQuote, - tSemicolon, - tSet, - element{itemKey, "baz"}, - tEqual, - tQuote, - element{itemValue, "qux"}, - tQuote, - tSemicolon, - tCloseBracket, - tEof, - }, - }, -} - -func collect(l *lexTest) []element { - _, v := lex("", l.input) - - emission := make([]element, 0) - - for i := range v { - emission = append(emission, i) - } - - return emission -} - -func TestFoo(t *testing.T) { - for _, test := range lexTests { - e := collect(&test) - - if len(e) != len(test.elements) { - t.Errorf("%s: got\n\n\t%v\nexpected\n\n\t%v", test.name, e, test.elements) - } - - for i, _ := range test.elements { - if test.elements[i] != e[i] { - t.Errorf("%s[%d]: got\n\n\t%v\nexpected\n\n\t%v", test.name, i, e[i], test.elements[i]) - } - } - } -} diff --git a/main.go b/main.go index a1d4dd0b83..7ca2bd91bf 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,12 @@ package main import ( "code.google.com/p/gorest" + "github.com/matttproud/prometheus/storage/metric/leveldb" "net/http" ) func main() { - m, _ := NewLevigoMetricPersistence("/tmp/metrics") + m, _ := leveldb.NewLevigoMetricPersistence("/tmp/metrics") s := &MetricsService{ persistence: m, } diff --git a/metric_persistence.go b/metric_persistence.go deleted file mode 100644 index ffc0668aea..0000000000 --- a/metric_persistence.go +++ /dev/null @@ -1,15 +0,0 @@ -package main - -type MetricPersistence interface { - Close() error - AppendSample(sample *Sample) error - - GetLabelNames() ([]string, error) - GetLabelPairs() ([]LabelPairs, error) - GetMetrics() ([]LabelPairs, error) - - GetMetricFingerprintsForLabelPairs(labelSets []*LabelPairs) ([]*Fingerprint, error) - RecordLabelNameFingerprint(sample *Sample) error - RecordFingerprintWatermark(sample *Sample) error - GetFingerprintLabelPairs(fingerprint Fingerprint) (LabelPairs, error) -} diff --git a/metric.go b/model/metric.go similarity index 98% rename from metric.go rename to model/metric.go index 0d8110c5a8..71b2c02ec9 100644 --- a/metric.go +++ b/model/metric.go @@ -1,4 +1,4 @@ -package main +package model import ( "crypto/md5" diff --git a/service.go b/service.go index fa268dc7a6..a4f91aedcc 100644 --- a/service.go +++ b/service.go @@ -2,18 +2,20 @@ package main import ( "code.google.com/p/gorest" + "github.com/matttproud/prometheus/model" + "github.com/matttproud/prometheus/storage/metric/leveldb" ) type MetricsService struct { gorest.RestService `root:"/" consumes:"application/json" produces:"application/json"` - persistence *LevigoMetricPersistence + persistence *leveldb.LevigoMetricPersistence listLabels gorest.EndPoint `method:"GET" path:"/labels/" output:"[]string"` - listLabelPairs gorest.EndPoint `method:"GET" path:"/label-pairs/" output:"[]LabelPairs"` - listMetrics gorest.EndPoint `method:"GET" path:"/metrics/" output:"[]LabelPairs"` + listLabelPairs gorest.EndPoint `method:"GET" path:"/label-pairs/" output:"[]model.LabelPairs"` + listMetrics gorest.EndPoint `method:"GET" path:"/metrics/" output:"[]model.LabelPairs"` - appendSample gorest.EndPoint `method:"POST" path:"/metrics/" postdata:"Sample"` + appendSample gorest.EndPoint `method:"POST" path:"/metrics/" postdata:"model.Sample"` } func (m MetricsService) ListLabels() []string { @@ -26,7 +28,7 @@ func (m MetricsService) ListLabels() []string { return labels } -func (m MetricsService) ListLabelPairs() []LabelPairs { +func (m MetricsService) ListLabelPairs() []model.LabelPairs { labelPairs, labelPairsError := m.persistence.GetLabelPairs() if labelPairsError != nil { @@ -36,7 +38,7 @@ func (m MetricsService) ListLabelPairs() []LabelPairs { return labelPairs } -func (m MetricsService) ListMetrics() []LabelPairs { +func (m MetricsService) ListMetrics() []model.LabelPairs { metrics, metricsError := m.persistence.GetMetrics() if metricsError != nil { @@ -46,7 +48,7 @@ func (m MetricsService) ListMetrics() []LabelPairs { return metrics } -func (m MetricsService) AppendSample(s Sample) { +func (m MetricsService) AppendSample(s model.Sample) { responseBuilder := m.ResponseBuilder() if appendError := m.persistence.AppendSample(&s); appendError == nil { responseBuilder.SetResponseCode(200) diff --git a/storage/metric/interface.go b/storage/metric/interface.go new file mode 100644 index 0000000000..70c26e2f51 --- /dev/null +++ b/storage/metric/interface.go @@ -0,0 +1,19 @@ +package metric + +import ( + "github.com/matttproud/prometheus/model" +) + +type MetricPersistence interface { + Close() error + AppendSample(sample *model.Sample) error + + GetLabelNames() ([]string, error) + GetLabelPairs() ([]model.LabelPairs, error) + GetMetrics() ([]model.LabelPairs, error) + + GetMetricFingerprintsForLabelPairs(labelSets []*model.LabelPairs) ([]*model.Fingerprint, error) + RecordLabelNameFingerprint(sample *model.Sample) error + RecordFingerprintWatermark(sample *model.Sample) error + GetFingerprintLabelPairs(fingerprint model.Fingerprint) (model.LabelPairs, error) +} diff --git a/levigo_metric_persistence.go b/storage/metric/leveldb/leveldb.go similarity index 88% rename from levigo_metric_persistence.go rename to storage/metric/leveldb/leveldb.go index 0833c91cc6..3c8208a465 100644 --- a/levigo_metric_persistence.go +++ b/storage/metric/leveldb/leveldb.go @@ -1,4 +1,4 @@ -package main +package leveldb import ( "code.google.com/p/goprotobuf/proto" @@ -6,30 +6,34 @@ import ( "fmt" "github.com/matttproud/prometheus/coding" "github.com/matttproud/prometheus/coding/indexable" + "github.com/matttproud/prometheus/model" data "github.com/matttproud/prometheus/model/generated" + index "github.com/matttproud/prometheus/storage/raw/index/leveldb" + storage "github.com/matttproud/prometheus/storage/raw/leveldb" "github.com/matttproud/prometheus/utility" + "io" "log" "sort" ) -type pendingArchival map[int64]float64 - type LevigoMetricPersistence struct { - fingerprintHighWaterMarks *LevigoPersistence - fingerprintLabelPairs *LevigoPersistence - fingerprintLowWaterMarks *LevigoPersistence - fingerprintSamples *LevigoPersistence - labelNameFingerprints *LevigoPersistence - labelPairFingerprints *LevigoPersistence - metricMembershipIndex *LevigoMembershipIndex + fingerprintHighWaterMarks *storage.LevigoPersistence + fingerprintLabelPairs *storage.LevigoPersistence + fingerprintLowWaterMarks *storage.LevigoPersistence + fingerprintSamples *storage.LevigoPersistence + labelNameFingerprints *storage.LevigoPersistence + labelPairFingerprints *storage.LevigoPersistence + metricMembershipIndex *index.LevigoMembershipIndex } +type levigoOpener func() + func (l *LevigoMetricPersistence) Close() error { log.Printf("Closing LevigoPersistence storage containers...") var persistences = []struct { name string - closer LevigoCloser + closer io.Closer }{ { "Fingerprint High-Water Marks", @@ -94,8 +98,6 @@ func (l *LevigoMetricPersistence) Close() error { return nil } -type levigoOpener func() - func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, error) { log.Printf("Opening LevigoPersistence storage containers...") @@ -111,7 +113,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, "High-Water Marks by Fingerprint", func() { var anomaly error - emission.fingerprintHighWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10) + emission.fingerprintHighWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/high_water_marks_by_fingerprint", 1000000, 10) errorChannel <- anomaly }, }, @@ -119,7 +121,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, "Label Names and Value Pairs by Fingerprint", func() { var anomaly error - emission.fingerprintLabelPairs, anomaly = NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) + emission.fingerprintLabelPairs, anomaly = storage.NewLevigoPersistence(baseDirectory+"/label_name_and_value_pairs_by_fingerprint", 1000000, 10) errorChannel <- anomaly }, }, @@ -127,7 +129,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, "Low-Water Marks by Fingerprint", func() { var anomaly error - emission.fingerprintLowWaterMarks, anomaly = NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10) + emission.fingerprintLowWaterMarks, anomaly = storage.NewLevigoPersistence(baseDirectory+"/low_water_marks_by_fingerprint", 1000000, 10) errorChannel <- anomaly }, }, @@ -135,7 +137,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, "Samples by Fingerprint", func() { var anomaly error - emission.fingerprintSamples, anomaly = NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) + emission.fingerprintSamples, anomaly = storage.NewLevigoPersistence(baseDirectory+"/samples_by_fingerprint", 1000000, 10) errorChannel <- anomaly }, }, @@ -143,7 +145,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, "Fingerprints by Label Name", func() { var anomaly error - emission.labelNameFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) + emission.labelNameFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name", 1000000, 10) errorChannel <- anomaly }, }, @@ -151,7 +153,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, "Fingerprints by Label Name and Value Pair", func() { var anomaly error - emission.labelPairFingerprints, anomaly = NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) + emission.labelPairFingerprints, anomaly = storage.NewLevigoPersistence(baseDirectory+"/fingerprints_by_label_name_and_value_pair", 1000000, 10) errorChannel <- anomaly }, }, @@ -159,7 +161,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, "Metric Membership Index", func() { var anomaly error - emission.metricMembershipIndex, anomaly = NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) + emission.metricMembershipIndex, anomaly = index.NewLevigoMembershipIndex(baseDirectory+"/metric_membership_index", 1000000, 10) errorChannel <- anomaly }, }, @@ -190,7 +192,7 @@ func NewLevigoMetricPersistence(baseDirectory string) (*LevigoMetricPersistence, return emission, nil } -func ddoFromSample(sample *Sample) *data.MetricDDO { +func ddoFromSample(sample *model.Sample) *data.MetricDDO { labelNames := make([]string, 0, len(sample.Labels)) for labelName, _ := range sample.Labels { @@ -218,7 +220,7 @@ func ddoFromSample(sample *Sample) *data.MetricDDO { return metricDDO } -func ddoFromMetric(metric Metric) *data.MetricDDO { +func ddoFromMetric(metric model.Metric) *data.MetricDDO { labelNames := make([]string, 0, len(metric)) for labelName, _ := range metric { @@ -266,7 +268,7 @@ func (l *LevigoMetricPersistence) indexMetric(ddo *data.MetricDDO) error { func fingerprintDDOForMessage(message proto.Message) (*data.FingerprintDDO, error) { if messageByteArray, marshalError := proto.Marshal(message); marshalError == nil { - fingerprint := FingerprintFromByteArray(messageByteArray) + fingerprint := model.FingerprintFromByteArray(messageByteArray) return &data.FingerprintDDO{ Signature: proto.String(string(fingerprint)), }, nil @@ -431,7 +433,7 @@ func (l *LevigoMetricPersistence) appendFingerprints(ddo *data.MetricDDO) error return errors.New("Unknown error in appending label pairs to fingerprint.") } -func (l *LevigoMetricPersistence) AppendSample(sample *Sample) error { +func (l *LevigoMetricPersistence) AppendSample(sample *model.Sample) error { fmt.Printf("Sample: %q\n", sample) metricDDO := ddoFromSample(sample) @@ -501,14 +503,14 @@ func (l *LevigoMetricPersistence) GetLabelNames() ([]string, error) { return nil, errors.New("Unknown error encountered when querying label names.") } -func (l *LevigoMetricPersistence) GetLabelPairs() ([]LabelPairs, error) { +func (l *LevigoMetricPersistence) GetLabelPairs() ([]model.LabelPairs, error) { if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { - result := make([]LabelPairs, 0, len(getAll)) + result := make([]model.LabelPairs, 0, len(getAll)) labelPairDDO := &data.LabelPairDDO{} for _, pair := range getAll { if unmarshalError := proto.Unmarshal(pair.Left, labelPairDDO); unmarshalError == nil { - item := LabelPairs{ + item := model.LabelPairs{ *labelPairDDO.Name: *labelPairDDO.Value, } result = append(result, item) @@ -526,12 +528,12 @@ func (l *LevigoMetricPersistence) GetLabelPairs() ([]LabelPairs, error) { return nil, errors.New("Unknown error encountered when querying label pairs.") } -func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) { +func (l *LevigoMetricPersistence) GetMetrics() ([]model.LabelPairs, error) { log.Printf("GetMetrics()\n") if getAll, getAllError := l.labelPairFingerprints.GetAll(); getAllError == nil { log.Printf("getAll: %q\n", getAll) - result := make([]LabelPairs, 0) + result := make([]model.LabelPairs, 0) fingerprintCollection := &data.FingerprintCollectionDDO{} fingerprints := make(utility.Set) @@ -552,7 +554,7 @@ func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) { labelPairCollectionDDO := &data.LabelPairCollectionDDO{} if labelPairCollectionDDOMarshalError := proto.Unmarshal(labelPairCollectionRaw, labelPairCollectionDDO); labelPairCollectionDDOMarshalError == nil { - intermediate := make(LabelPairs, 0) + intermediate := make(model.LabelPairs, 0) for _, member := range labelPairCollectionDDO.Member { intermediate[*member.Name] = *member.Value @@ -581,7 +583,7 @@ func (l *LevigoMetricPersistence) GetMetrics() ([]LabelPairs, error) { return nil, errors.New("Unknown error encountered when querying metrics.") } -func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interval, int, error) { +func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric model.Metric) (*model.Interval, int, error) { metricDDO := ddoFromMetric(metric) if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { @@ -602,7 +604,7 @@ func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interv var foundEntries int = 0 if *fingerprintDDO.Signature == *found.Fingerprint.Signature { - emission := &Interval{ + emission := &model.Interval{ OldestInclusive: indexable.DecodeTime(found.Timestamp), NewestInclusive: indexable.DecodeTime(found.Timestamp), } @@ -622,7 +624,7 @@ func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interv } return emission, foundEntries, nil } else { - return &Interval{}, -6, nil + return &model.Interval{}, -6, nil } } else { log.Printf("Could not de-serialize start key: %q\n", unmarshalErr) @@ -655,7 +657,7 @@ func (l *LevigoMetricPersistence) GetWatermarksForMetric(metric Metric) (*Interv // TODO(mtp): Holes in the data! -func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval Interval) ([]Samples, error) { +func (l *LevigoMetricPersistence) GetSamplesForMetric(metric model.Metric, interval model.Interval) ([]model.Samples, error) { metricDDO := ddoFromMetric(metric) if fingerprintDDO, fingerprintDDOErr := fingerprintDDOForMessage(metricDDO); fingerprintDDOErr == nil { @@ -667,7 +669,7 @@ func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval In Timestamp: indexable.EncodeTime(interval.OldestInclusive), } - emission := make([]Samples, 0) + emission := make([]model.Samples, 0) if encode, encodeErr := coding.NewProtocolBufferEncoder(start).Encode(); encodeErr == nil { iterator.Seek(encode) @@ -680,8 +682,8 @@ func (l *LevigoMetricPersistence) GetSamplesForMetric(metric Metric, interval In if *fingerprintDDO.Signature == *key.Fingerprint.Signature { // Wart if indexable.DecodeTime(key.Timestamp).Unix() <= interval.NewestInclusive.Unix() { - emission = append(emission, Samples{ - Value: SampleValue(*value.Value), + emission = append(emission, model.Samples{ + Value: model.SampleValue(*value.Value), Timestamp: indexable.DecodeTime(key.Timestamp), }) } else { diff --git a/levigo_metric_persistence_test.go b/storage/metric/leveldb/leveldb_test.go similarity index 95% rename from levigo_metric_persistence_test.go rename to storage/metric/leveldb/leveldb_test.go index b54a4caec2..73ba1e6b3a 100644 --- a/levigo_metric_persistence_test.go +++ b/storage/metric/leveldb/leveldb_test.go @@ -1,8 +1,9 @@ -package main +package leveldb import ( "code.google.com/p/goprotobuf/proto" "fmt" + "github.com/matttproud/prometheus/model" data "github.com/matttproud/prometheus/model/generated" "io/ioutil" "math" @@ -168,10 +169,10 @@ func TestAppendSampleAsPureSparseAppend(t *testing.T) { }() appendSample := func(x int) bool { - sample := &Sample{ - Value: SampleValue(float32(x)), + sample := &model.Sample{ + Value: model.SampleValue(float32(x)), Timestamp: time.Unix(int64(x), int64(x)), - Labels: LabelPairs{string(x): string(x)}, + Labels: model.LabelPairs{string(x): string(x)}, } appendErr := persistence.AppendSample(sample) @@ -200,10 +201,10 @@ func TestAppendSampleAsSparseAppendWithReads(t *testing.T) { }() appendSample := func(x int) bool { - sample := &Sample{ - Value: SampleValue(float32(x)), + sample := &model.Sample{ + Value: model.SampleValue(float32(x)), Timestamp: time.Unix(int64(x), int64(x)), - Labels: LabelPairs{string(x): string(x)}, + Labels: model.LabelPairs{string(x): string(x)}, } appendErr := persistence.AppendSample(sample) @@ -293,10 +294,10 @@ func TestAppendSampleAsPureSingleEntityAppend(t *testing.T) { }() appendSample := func(x int) bool { - sample := &Sample{ - Value: SampleValue(float32(x)), + sample := &model.Sample{ + Value: model.SampleValue(float32(x)), Timestamp: time.Unix(int64(x), 0), - Labels: LabelPairs{"name": "my_metric"}, + Labels: model.LabelPairs{"name": "my_metric"}, } appendErr := persistence.AppendSample(sample) @@ -338,8 +339,8 @@ func TestStochastic(t *testing.T) { metricNewestSample := make(map[int]int64) for metricIndex := 0; metricIndex < numberOfMetrics; metricIndex++ { - sample := &Sample{ - Labels: LabelPairs{}, + sample := &model.Sample{ + Labels: model.LabelPairs{}, } sample.Labels["name"] = fmt.Sprintf("metric_index_%d", metricIndex) @@ -381,7 +382,7 @@ func TestStochastic(t *testing.T) { for sampleIndex := 0; sampleIndex < numberOfSamples; sampleIndex++ { sample.Timestamp = time.Unix(nextTimestamp(), 0) - sample.Value = SampleValue(sampleIndex) + sample.Value = model.SampleValue(sampleIndex) appendErr := persistence.AppendSample(sample) @@ -504,7 +505,7 @@ func TestStochastic(t *testing.T) { } } - metric := make(Metric) + metric := make(model.Metric) metric["name"] = fmt.Sprintf("metric_index_%d", metricIndex) @@ -565,7 +566,7 @@ func TestStochastic(t *testing.T) { end = second } - interval := Interval{ + interval := model.Interval{ OldestInclusive: time.Unix(begin, 0), NewestInclusive: time.Unix(end, 0), } diff --git a/index.go b/storage/raw/index/interface.go similarity index 93% rename from index.go rename to storage/raw/index/interface.go index ad203d9baa..c60e56ea5b 100644 --- a/index.go +++ b/storage/raw/index/interface.go @@ -1,4 +1,4 @@ -package main +package index import ( "github.com/matttproud/prometheus/coding" diff --git a/levigo_index.go b/storage/raw/index/leveldb/leveldb.go similarity index 74% rename from levigo_index.go rename to storage/raw/index/leveldb/leveldb.go index c2da77bd2d..18f38c7b01 100644 --- a/levigo_index.go +++ b/storage/raw/index/leveldb/leveldb.go @@ -1,8 +1,9 @@ -package main +package leveldb import ( "github.com/matttproud/prometheus/coding" data "github.com/matttproud/prometheus/model/generated" + "github.com/matttproud/prometheus/storage/raw/leveldb" ) var ( @@ -10,7 +11,7 @@ var ( ) type LevigoMembershipIndex struct { - persistence *LevigoPersistence + persistence *leveldb.LevigoPersistence } func (l *LevigoMembershipIndex) Close() error { @@ -30,10 +31,10 @@ func (l *LevigoMembershipIndex) Put(key coding.Encoder) error { } func NewLevigoMembershipIndex(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoMembershipIndex, error) { - var levigoPersistence *LevigoPersistence + var levigoPersistence *leveldb.LevigoPersistence var levigoPersistenceError error - if levigoPersistence, levigoPersistenceError = NewLevigoPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); levigoPersistenceError == nil { + if levigoPersistence, levigoPersistenceError = leveldb.NewLevigoPersistence(storageRoot, cacheCapacity, bitsPerBloomFilterEncoded); levigoPersistenceError == nil { levigoMembershipIndex := &LevigoMembershipIndex{ persistence: levigoPersistence, } diff --git a/persistence.go b/storage/raw/interface.go similarity index 96% rename from persistence.go rename to storage/raw/interface.go index 0ec743124f..74fcb312f7 100644 --- a/persistence.go +++ b/storage/raw/interface.go @@ -1,4 +1,4 @@ -package main +package raw import ( "github.com/matttproud/prometheus/coding" diff --git a/levigo_persistence.go b/storage/raw/leveldb/leveldb.go similarity index 86% rename from levigo_persistence.go rename to storage/raw/leveldb/leveldb.go index 62c2a478e4..4e16f6070a 100644 --- a/levigo_persistence.go +++ b/storage/raw/leveldb/leveldb.go @@ -1,15 +1,12 @@ -package main +package leveldb import ( "github.com/jmhodges/levigo" "github.com/matttproud/prometheus/coding" + "github.com/matttproud/prometheus/storage/raw" "io" ) -type LevigoCloser interface { - Close() error -} - type LevigoPersistence struct { cache *levigo.Cache filterPolicy *levigo.FilterPolicy @@ -19,6 +16,13 @@ type LevigoPersistence struct { writeOptions *levigo.WriteOptions } +type iteratorCloser struct { + iterator *levigo.Iterator + readOptions *levigo.ReadOptions + snapshot *levigo.Snapshot + storage *levigo.DB +} + func NewLevigoPersistence(storageRoot string, cacheCapacity, bitsPerBloomFilterEncoded int) (*LevigoPersistence, error) { options := levigo.NewOptions() options.SetCreateIfMissing(true) @@ -87,14 +91,13 @@ func (l *LevigoPersistence) Close() error { } func (l *LevigoPersistence) Get(value coding.Encoder) ([]byte, error) { - var key []byte - var keyError error - - if key, keyError = value.Encode(); keyError == nil { + if key, keyError := value.Encode(); keyError == nil { return l.storage.Get(l.readOptions, key) + } else { + return nil, keyError } - return nil, keyError + panic("unreachable") } func (l *LevigoPersistence) Has(value coding.Encoder) (bool, error) { @@ -108,26 +111,20 @@ func (l *LevigoPersistence) Has(value coding.Encoder) (bool, error) { } func (l *LevigoPersistence) Drop(value coding.Encoder) error { - var key []byte - var keyError error - - if key, keyError = value.Encode(); keyError == nil { + if key, keyError := value.Encode(); keyError == nil { if deleteError := l.storage.Delete(l.writeOptions, key); deleteError != nil { return deleteError } - - return nil + } else { + return keyError } - return keyError + return nil } func (l *LevigoPersistence) Put(key, value coding.Encoder) error { - var keyEncoded []byte - var keyError error - - if keyEncoded, keyError = key.Encode(); keyError == nil { + if keyEncoded, keyError := key.Encode(); keyError == nil { if valueEncoded, valueError := value.Encode(); valueError == nil { if putError := l.storage.Put(l.writeOptions, keyEncoded, valueEncoded); putError != nil { @@ -136,14 +133,14 @@ func (l *LevigoPersistence) Put(key, value coding.Encoder) error { } else { return valueError } - - return nil + } else { + return keyError } - return keyError + return nil } -func (l *LevigoPersistence) GetAll() ([]Pair, error) { +func (l *LevigoPersistence) GetAll() ([]raw.Pair, error) { snapshot := l.storage.NewSnapshot() defer l.storage.ReleaseSnapshot(snapshot) readOptions := levigo.NewReadOptions() @@ -154,10 +151,10 @@ func (l *LevigoPersistence) GetAll() ([]Pair, error) { defer iterator.Close() iterator.SeekToFirst() - result := make([]Pair, 0) + result := make([]raw.Pair, 0) for iterator := iterator; iterator.Valid(); iterator.Next() { - result = append(result, Pair{Left: iterator.Key(), Right: iterator.Value()}) + result = append(result, raw.Pair{Left: iterator.Key(), Right: iterator.Value()}) } iteratorError := iterator.GetError() @@ -169,13 +166,6 @@ func (l *LevigoPersistence) GetAll() ([]Pair, error) { return result, nil } -type iteratorCloser struct { - iterator *levigo.Iterator - readOptions *levigo.ReadOptions - snapshot *levigo.Snapshot - storage *levigo.DB -} - func (i *iteratorCloser) Close() error { defer func() { if i.storage != nil {