diff --git a/storage/local/chunk.go b/storage/local/chunk.go index b292a378fb..0075a1a377 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -28,26 +28,27 @@ import ( ) // DefaultChunkEncoding can be changed via a flag. -var DefaultChunkEncoding = doubleDelta +var DefaultChunkEncoding = DoubleDelta var errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries") -type chunkEncoding byte +// ChunkEncoding defintes which encoding we are using, delta, doubledelta, or varbit +type ChunkEncoding byte // String implements flag.Value. -func (ce chunkEncoding) String() string { +func (ce ChunkEncoding) String() string { return fmt.Sprintf("%d", ce) } // Set implements flag.Value. -func (ce *chunkEncoding) Set(s string) error { +func (ce *ChunkEncoding) Set(s string) error { switch s { case "0": - *ce = delta + *ce = Delta case "1": - *ce = doubleDelta + *ce = DoubleDelta case "2": - *ce = varbit + *ce = Varbit default: return fmt.Errorf("invalid chunk encoding: %s", s) } @@ -55,12 +56,15 @@ func (ce *chunkEncoding) Set(s string) error { } const ( - delta chunkEncoding = iota - doubleDelta - varbit + // Delta encoding + Delta ChunkEncoding = iota + // DoubleDelta encoding + DoubleDelta + // Varbit encoding + Varbit ) -// chunkDesc contains meta-data for a chunk. Pay special attention to the +// ChunkDesc contains meta-data for a chunk. Pay special attention to the // documented requirements for calling its methods concurrently (WRT pinning and // locking). The doc comments spell out the requirements for each method, but // here is an overview and general explanation: @@ -88,9 +92,9 @@ const ( // is populated upon creation of a chunkDesc, so it is alway safe to call // firstTime. The firstTime method is arguably not needed and only there for // consistency with lastTime. -type chunkDesc struct { +type ChunkDesc struct { sync.Mutex // Protects pinning. - c chunk // nil if chunk is evicted. + c Chunk // nil if chunk is evicted. rCnt int chunkFirstTime model.Time // Populated at creation. Immutable. chunkLastTime model.Time // Populated on closing of the chunk, model.Earliest if unset. @@ -101,14 +105,14 @@ type chunkDesc struct { evictListElement *list.Element } -// newChunkDesc creates a new chunkDesc pointing to the provided chunk. The +// NewChunkDesc creates a new chunkDesc pointing to the provided chunk. The // provided chunk is assumed to be not persisted yet. Therefore, the refCount of // the new chunkDesc is 1 (preventing eviction prior to persisting). -func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { +func NewChunkDesc(c Chunk, firstTime model.Time) *ChunkDesc { chunkOps.WithLabelValues(createAndPin).Inc() atomic.AddInt64(&numMemChunks, 1) numMemChunkDescs.Inc() - return &chunkDesc{ + return &ChunkDesc{ c: c, rCnt: 1, chunkFirstTime: firstTime, @@ -116,18 +120,18 @@ func newChunkDesc(c chunk, firstTime model.Time) *chunkDesc { } } -// add adds a sample pair to the underlying chunk. For safe concurrent access, +// Add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (cd *chunkDesc) add(s model.SamplePair) ([]chunk, error) { - return cd.c.add(s) +func (cd *ChunkDesc) Add(s model.SamplePair) ([]Chunk, error) { + return cd.c.Add(s) } // pin increments the refCount by one. Upon increment from 0 to 1, this // chunkDesc is removed from the evict list. To enable the latter, the // evictRequests channel has to be provided. This method can be called // concurrently at any time. -func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { +func (cd *ChunkDesc) pin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -142,7 +146,7 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) { // chunkDesc is added to the evict list. To enable the latter, the evictRequests // channel has to be provided. This method can be called concurrently at any // time. -func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { +func (cd *ChunkDesc) unpin(evictRequests chan<- evictRequest) { cd.Lock() defer cd.Unlock() @@ -158,7 +162,7 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) { // refCount returns the number of pins. This method can be called concurrently // at any time. -func (cd *chunkDesc) refCount() int { +func (cd *ChunkDesc) refCount() int { cd.Lock() defer cd.Unlock() @@ -169,18 +173,18 @@ func (cd *chunkDesc) refCount() int { // can be called concurrently at any time. It only returns the immutable // cd.chunkFirstTime without any locking. Arguably, this method is // useless. However, it provides consistency with the lastTime method. -func (cd *chunkDesc) firstTime() model.Time { +func (cd *ChunkDesc) firstTime() model.Time { return cd.chunkFirstTime } // lastTime returns the timestamp of the last sample in the chunk. For safe // concurrent access, this method requires the fingerprint of the time series to // be locked. -func (cd *chunkDesc) lastTime() (model.Time, error) { +func (cd *ChunkDesc) lastTime() (model.Time, error) { if cd.chunkLastTime != model.Earliest || cd.c == nil { return cd.chunkLastTime, nil } - return cd.c.newIterator().lastTimestamp() + return cd.c.NewIterator().LastTimestamp() } // maybePopulateLastTime populates the chunkLastTime from the underlying chunk @@ -188,9 +192,9 @@ func (cd *chunkDesc) lastTime() (model.Time, error) { // last sample to a chunk or after closing a head chunk due to age. For safe // concurrent access, the chunk must be pinned, and the caller must have locked // the fingerprint of the series. -func (cd *chunkDesc) maybePopulateLastTime() error { +func (cd *ChunkDesc) maybePopulateLastTime() error { if cd.chunkLastTime == model.Earliest && cd.c != nil { - t, err := cd.c.newIterator().lastTimestamp() + t, err := cd.c.NewIterator().LastTimestamp() if err != nil { return err } @@ -201,7 +205,7 @@ func (cd *chunkDesc) maybePopulateLastTime() error { // isEvicted returns whether the chunk is evicted. For safe concurrent access, // the caller must have locked the fingerprint of the series. -func (cd *chunkDesc) isEvicted() bool { +func (cd *ChunkDesc) isEvicted() bool { // Locking required here because we do not want the caller to force // pinning the chunk first, so it could be evicted while this method is // called. @@ -214,7 +218,7 @@ func (cd *chunkDesc) isEvicted() bool { // setChunk sets the underlying chunk. The caller must have locked the // fingerprint of the series and must have "pre-pinned" the chunk (i.e. first // call pin and then set the chunk). -func (cd *chunkDesc) setChunk(c chunk) { +func (cd *ChunkDesc) setChunk(c Chunk) { if cd.c != nil { panic("chunk already set") } @@ -224,7 +228,7 @@ func (cd *chunkDesc) setChunk(c chunk) { // maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk // is now evicted, which includes the case that the chunk was evicted even // before this method was called. It can be called concurrently at any time. -func (cd *chunkDesc) maybeEvict() bool { +func (cd *ChunkDesc) maybeEvict() bool { cd.Lock() defer cd.Unlock() @@ -244,133 +248,134 @@ func (cd *chunkDesc) maybeEvict() bool { return true } -// chunk is the interface for all chunks. Chunks are generally not +// Chunk is the interface for all chunks. Chunks are generally not // goroutine-safe. -type chunk interface { +type Chunk interface { // add adds a SamplePair to the chunks, performs any necessary // re-encoding, and adds any necessary overflow chunks. It returns the // new version of the original chunk, followed by overflow chunks, if // any. The first chunk returned might be the same as the original one // or a newly allocated version. In any case, take the returned chunk as // the relevant one and discard the original chunk. - add(sample model.SamplePair) ([]chunk, error) - clone() chunk - firstTime() model.Time - newIterator() chunkIterator - marshal(io.Writer) error - marshalToBuf([]byte) error - unmarshal(io.Reader) error - unmarshalFromBuf([]byte) error - encoding() chunkEncoding + Add(sample model.SamplePair) ([]Chunk, error) + Clone() Chunk + FirstTime() model.Time + NewIterator() ChunkIterator + Marshal(io.Writer) error + MarshalToBuf([]byte) error + Unmarshal(io.Reader) error + UnmarshalFromBuf([]byte) error + Encoding() ChunkEncoding } -// A chunkIterator enables efficient access to the content of a chunk. It is +// ChunkIterator enables efficient access to the content of a chunk. It is // generally not safe to use a chunkIterator concurrently with or after chunk // mutation. -type chunkIterator interface { +type ChunkIterator interface { // Gets the last timestamp in the chunk. - lastTimestamp() (model.Time, error) + LastTimestamp() (model.Time, error) // Whether a given timestamp is contained between first and last value // in the chunk. - contains(model.Time) (bool, error) + Contains(model.Time) (bool, error) // Scans the next value in the chunk. Directly after the iterator has // been created, the next value is the first value in the // chunk. Otherwise, it is the value following the last value scanned or // found (by one of the find... methods). Returns false if either the // end of the chunk is reached or an error has occurred. - scan() bool + Scan() bool // Finds the most recent value at or before the provided time. Returns // false if either the chunk contains no value at or before the provided // time, or an error has occurred. - findAtOrBefore(model.Time) bool + FindAtOrBefore(model.Time) bool // Finds the oldest value at or after the provided time. Returns false // if either the chunk contains no value at or after the provided time, // or an error has occurred. - findAtOrAfter(model.Time) bool + FindAtOrAfter(model.Time) bool // Returns the last value scanned (by the scan method) or found (by one // of the find... methods). It returns ZeroSamplePair before any of // those methods were called. - value() model.SamplePair + Value() model.SamplePair // Returns the last error encountered. In general, an error signals data // corruption in the chunk and requires quarantining. - err() error + Err() error } // rangeValues is a utility function that retrieves all values within the given // range from a chunkIterator. -func rangeValues(it chunkIterator, in metric.Interval) ([]model.SamplePair, error) { +func rangeValues(it ChunkIterator, in metric.Interval) ([]model.SamplePair, error) { result := []model.SamplePair{} - if !it.findAtOrAfter(in.OldestInclusive) { - return result, it.err() + if !it.FindAtOrAfter(in.OldestInclusive) { + return result, it.Err() } - for !it.value().Timestamp.After(in.NewestInclusive) { - result = append(result, it.value()) - if !it.scan() { + for !it.Value().Timestamp.After(in.NewestInclusive) { + result = append(result, it.Value()) + if !it.Scan() { break } } - return result, it.err() + return result, it.Err() } // addToOverflowChunk is a utility function that creates a new chunk as overflow // chunk, adds the provided sample to it, and returns a chunk slice containing // the provided old chunk followed by the new overflow chunk. -func addToOverflowChunk(c chunk, s model.SamplePair) ([]chunk, error) { - overflowChunks, err := newChunk().add(s) +func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) { + overflowChunks, err := NewChunk().Add(s) if err != nil { return nil, err } - return []chunk{c, overflowChunks[0]}, nil + return []Chunk{c, overflowChunks[0]}, nil } // transcodeAndAdd is a utility function that transcodes the dst chunk into the // provided src chunk (plus the necessary overflow chunks) and then adds the // provided sample. It returns the new chunks (transcoded plus overflow) with // the new sample at the end. -func transcodeAndAdd(dst chunk, src chunk, s model.SamplePair) ([]chunk, error) { +func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error) { chunkOps.WithLabelValues(transcode).Inc() var ( head = dst - body, newChunks []chunk + body, NewChunks []Chunk err error ) - it := src.newIterator() - for it.scan() { - if newChunks, err = head.add(it.value()); err != nil { + it := src.NewIterator() + for it.Scan() { + if NewChunks, err = head.Add(it.Value()); err != nil { return nil, err } - body = append(body, newChunks[:len(newChunks)-1]...) - head = newChunks[len(newChunks)-1] + body = append(body, NewChunks[:len(NewChunks)-1]...) + head = NewChunks[len(NewChunks)-1] } - if it.err() != nil { - return nil, it.err() + if it.Err() != nil { + return nil, it.Err() } - if newChunks, err = head.add(s); err != nil { + if NewChunks, err = head.Add(s); err != nil { return nil, err } - return append(body, newChunks...), nil + return append(body, NewChunks...), nil } -// newChunk creates a new chunk according to the encoding set by the +// NewChunk creates a new chunk according to the encoding set by the // DefaultChunkEncoding flag. -func newChunk() chunk { - chunk, err := newChunkForEncoding(DefaultChunkEncoding) +func NewChunk() Chunk { + chunk, err := NewChunkForEncoding(DefaultChunkEncoding) if err != nil { panic(err) } return chunk } -func newChunkForEncoding(encoding chunkEncoding) (chunk, error) { +// NewChunkForEncoding allows configuring what chunk type you want +func NewChunkForEncoding(encoding ChunkEncoding) (Chunk, error) { switch encoding { - case delta: + case Delta: return newDeltaEncodedChunk(d1, d0, true, chunkLen), nil - case doubleDelta: + case DoubleDelta: return newDoubleDeltaEncodedChunk(d1, d0, true, chunkLen), nil - case varbit: + case Varbit: return newVarbitChunk(varbitZeroEncoding), nil default: return nil, fmt.Errorf("unknown chunk encoding: %v", encoding) @@ -403,18 +408,18 @@ func newIndexAccessingChunkIterator(len int, acc indexAccessor) *indexAccessingC } // lastTimestamp implements chunkIterator. -func (it *indexAccessingChunkIterator) lastTimestamp() (model.Time, error) { +func (it *indexAccessingChunkIterator) LastTimestamp() (model.Time, error) { return it.acc.timestampAtIndex(it.len - 1), it.acc.err() } // contains implements chunkIterator. -func (it *indexAccessingChunkIterator) contains(t model.Time) (bool, error) { +func (it *indexAccessingChunkIterator) Contains(t model.Time) (bool, error) { return !t.Before(it.acc.timestampAtIndex(0)) && !t.After(it.acc.timestampAtIndex(it.len-1)), it.acc.err() } // scan implements chunkIterator. -func (it *indexAccessingChunkIterator) scan() bool { +func (it *indexAccessingChunkIterator) Scan() bool { it.pos++ if it.pos >= it.len { return false @@ -427,7 +432,7 @@ func (it *indexAccessingChunkIterator) scan() bool { } // findAtOrBefore implements chunkIterator. -func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool { +func (it *indexAccessingChunkIterator) FindAtOrBefore(t model.Time) bool { i := sort.Search(it.len, func(i int) bool { return it.acc.timestampAtIndex(i).After(t) }) @@ -443,7 +448,7 @@ func (it *indexAccessingChunkIterator) findAtOrBefore(t model.Time) bool { } // findAtOrAfter implements chunkIterator. -func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool { +func (it *indexAccessingChunkIterator) FindAtOrAfter(t model.Time) bool { i := sort.Search(it.len, func(i int) bool { return !it.acc.timestampAtIndex(i).Before(t) }) @@ -459,11 +464,11 @@ func (it *indexAccessingChunkIterator) findAtOrAfter(t model.Time) bool { } // value implements chunkIterator. -func (it *indexAccessingChunkIterator) value() model.SamplePair { +func (it *indexAccessingChunkIterator) Value() model.SamplePair { return it.lastValue } // err implements chunkIterator. -func (it *indexAccessingChunkIterator) err() error { +func (it *indexAccessingChunkIterator) Err() error { return it.acc.err() } diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 2f695f10b7..09eb395003 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -114,7 +114,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint ) } s.chunkDescs = append( - make([]*chunkDesc, 0, len(s.chunkDescs)-s.persistWatermark), + make([]*ChunkDesc, 0, len(s.chunkDescs)-s.persistWatermark), s.chunkDescs[s.persistWatermark:]..., ) numMemChunkDescs.Sub(float64(s.persistWatermark)) diff --git a/storage/local/delta.go b/storage/local/delta.go index 2fb7d3d764..e2e0dc0a63 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -73,7 +73,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod } // add implements chunk. -func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { +func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. if c.len() == 0 { c = c[:deltaHeaderBytes] @@ -174,23 +174,23 @@ func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c}, nil + return []Chunk{&c}, nil } // clone implements chunk. -func (c deltaEncodedChunk) clone() chunk { +func (c deltaEncodedChunk) Clone() Chunk { clone := make(deltaEncodedChunk, len(c), cap(c)) copy(clone, c) return &clone } // firstTime implements chunk. -func (c deltaEncodedChunk) firstTime() model.Time { +func (c deltaEncodedChunk) FirstTime() model.Time { return c.baseTime() } -// newIterator implements chunk. -func (c *deltaEncodedChunk) newIterator() chunkIterator { +// NewIterator implements chunk. +func (c *deltaEncodedChunk) NewIterator() ChunkIterator { return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ c: *c, baseT: c.baseTime(), @@ -202,7 +202,7 @@ func (c *deltaEncodedChunk) newIterator() chunkIterator { } // marshal implements chunk. -func (c deltaEncodedChunk) marshal(w io.Writer) error { +func (c deltaEncodedChunk) Marshal(w io.Writer) error { if len(c) > math.MaxUint16 { panic("chunk buffer length would overflow a 16 bit uint.") } @@ -218,8 +218,8 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error { return nil } -// marshalToBuf implements chunk. -func (c deltaEncodedChunk) marshalToBuf(buf []byte) error { +// MarshalToBuf implements chunk. +func (c deltaEncodedChunk) MarshalToBuf(buf []byte) error { if len(c) > math.MaxUint16 { panic("chunk buffer length would overflow a 16 bit uint") } @@ -233,7 +233,7 @@ func (c deltaEncodedChunk) marshalToBuf(buf []byte) error { } // unmarshal implements chunk. -func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { +func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] if _, err := io.ReadFull(r, *c); err != nil { return err @@ -250,7 +250,7 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { } // unmarshalFromBuf implements chunk. -func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error { +func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) @@ -265,7 +265,7 @@ func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error { } // encoding implements chunk. -func (c deltaEncodedChunk) encoding() chunkEncoding { return delta } +func (c deltaEncodedChunk) Encoding() ChunkEncoding { return Delta } func (c deltaEncodedChunk) timeBytes() deltaBytes { return deltaBytes(c[deltaHeaderTimeBytesOffset]) diff --git a/storage/local/delta_test.go b/storage/local/delta_test.go index 7ad083d62a..8f63c208f7 100644 --- a/storage/local/delta_test.go +++ b/storage/local/delta_test.go @@ -53,13 +53,13 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { cases := []struct { chunkTypeName string - chunkConstructor func(deltaBytes, deltaBytes, bool, int) chunk + chunkConstructor func(deltaBytes, deltaBytes, bool, int) Chunk minHeaderLen int chunkLenPos int }{ { chunkTypeName: "deltaEncodedChunk", - chunkConstructor: func(a, b deltaBytes, c bool, d int) chunk { + chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk { return newDeltaEncodedChunk(a, b, c, d) }, minHeaderLen: deltaHeaderBytes, @@ -67,7 +67,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { }, { chunkTypeName: "doubleDeltaEncodedChunk", - chunkConstructor: func(a, b deltaBytes, c bool, d int) chunk { + chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk { return newDoubleDeltaEncodedChunk(a, b, c, d) }, minHeaderLen: doubleDeltaHeaderMinBytes, @@ -77,7 +77,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { for _, c := range cases { chunk := c.chunkConstructor(d1, d4, false, chunkLen) - cs, err := chunk.add(model.SamplePair{ + cs, err := chunk.Add(model.SamplePair{ Timestamp: model.Now(), Value: model.SampleValue(100), }) @@ -87,16 +87,16 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) { buf := make([]byte, chunkLen) - cs[0].marshalToBuf(buf) + cs[0].MarshalToBuf(buf) // Corrupt the length to be every possible too-small value for i := 0; i < c.minHeaderLen; i++ { binary.LittleEndian.PutUint16(buf[c.chunkLenPos:], uint16(i)) - err = cs[0].unmarshalFromBuf(buf) + err = cs[0].UnmarshalFromBuf(buf) verifyUnmarshallingError(err, c.chunkTypeName, "buf", i) - err = cs[0].unmarshal(bytes.NewBuffer(buf)) + err = cs[0].Unmarshal(bytes.NewBuffer(buf)) verifyUnmarshallingError(err, c.chunkTypeName, "Reader", i) } } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index 41a1471d6c..e8f3cecef7 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -81,7 +81,7 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub } // add implements chunk. -func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { +func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. if c.len() == 0 { return c.addFirstSample(s), nil @@ -181,23 +181,23 @@ func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) { return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb) } } - return []chunk{&c}, nil + return []Chunk{&c}, nil } // clone implements chunk. -func (c doubleDeltaEncodedChunk) clone() chunk { +func (c doubleDeltaEncodedChunk) Clone() Chunk { clone := make(doubleDeltaEncodedChunk, len(c), cap(c)) copy(clone, c) return &clone } // firstTime implements chunk. -func (c doubleDeltaEncodedChunk) firstTime() model.Time { +func (c doubleDeltaEncodedChunk) FirstTime() model.Time { return c.baseTime() } -// newIterator implements chunk. -func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { +// NewIterator( implements chunk. +func (c *doubleDeltaEncodedChunk) NewIterator() ChunkIterator { return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ c: *c, baseT: c.baseTime(), @@ -211,7 +211,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { } // marshal implements chunk. -func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { +func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error { if len(c) > math.MaxUint16 { panic("chunk buffer length would overflow a 16 bit uint") } @@ -227,8 +227,8 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { return nil } -// marshalToBuf implements chunk. -func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error { +// MarshalToBuf implements chunk. +func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error { if len(c) > math.MaxUint16 { panic("chunk buffer length would overflow a 16 bit uint") } @@ -242,7 +242,7 @@ func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error { } // unmarshal implements chunk. -func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { +func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] if _, err := io.ReadFull(r, *c); err != nil { return err @@ -260,7 +260,7 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { } // unmarshalFromBuf implements chunk. -func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error { +func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { *c = (*c)[:cap(*c)] copy(*c, buf) l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) @@ -275,7 +275,7 @@ func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error { } // encoding implements chunk. -func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta } +func (c doubleDeltaEncodedChunk) Encoding() ChunkEncoding { return DoubleDelta } func (c doubleDeltaEncodedChunk) baseTime() model.Time { return model.Time( @@ -347,7 +347,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk { +func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk { c = c[:doubleDeltaHeaderBaseValueOffset+8] binary.LittleEndian.PutUint64( c[doubleDeltaHeaderBaseTimeOffset:], @@ -357,12 +357,12 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk { c[doubleDeltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)), ) - return []chunk{&c} + return []Chunk{&c} } // addSecondSample is a helper method only used by c.add(). It calculates the // base delta from the provided sample and adds it to the chunk. -func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) { +func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) { baseTimeDelta := s.Timestamp - c.baseTime() if baseTimeDelta < 0 { return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta) @@ -403,7 +403,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt math.Float64bits(float64(baseValueDelta)), ) } - return []chunk{&c}, nil + return []Chunk{&c}, nil } // doubleDeltaEncodedIndexAccessor implements indexAccessor. diff --git a/storage/local/heads.go b/storage/local/heads.go index 60176a1fc6..d8f0198005 100644 --- a/storage/local/heads.go +++ b/storage/local/heads.go @@ -107,7 +107,7 @@ func (hs *headsScanner) scan() bool { firstTime int64 lastTime int64 encoding byte - ch chunk + ch Chunk lastTimeHead model.Time ) if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil { @@ -146,7 +146,7 @@ func (hs *headsScanner) scan() bool { if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } - chunkDescs := make([]*chunkDesc, numChunkDescs) + chunkDescs := make([]*ChunkDesc, numChunkDescs) if hs.version == headsFormatLegacyVersion { if headChunkPersisted { persistWatermark = numChunkDescs @@ -163,7 +163,7 @@ func (hs *headsScanner) scan() bool { if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil { return false } - chunkDescs[i] = &chunkDesc{ + chunkDescs[i] = &ChunkDesc{ chunkFirstTime: model.Time(firstTime), chunkLastTime: model.Time(lastTime), } @@ -176,13 +176,13 @@ func (hs *headsScanner) scan() bool { if encoding, hs.err = hs.r.ReadByte(); hs.err != nil { return false } - if ch, hs.err = newChunkForEncoding(chunkEncoding(encoding)); hs.err != nil { + if ch, hs.err = NewChunkForEncoding(ChunkEncoding(encoding)); hs.err != nil { return false } - if hs.err = ch.unmarshal(hs.r); hs.err != nil { + if hs.err = ch.Unmarshal(hs.r); hs.err != nil { return false } - cd := newChunkDesc(ch, ch.firstTime()) + cd := NewChunkDesc(ch, ch.FirstTime()) if i < numChunkDescs-1 { // This is NOT the head chunk. So it's a chunk // to be persisted, and we need to populate lastTime. diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 517104fd05..b0e1e3a2d8 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -370,7 +370,7 @@ func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelVa // // Returning an error signals problems with the series file. In this case, the // caller should quarantine the series. -func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) { +func (p *persistence) persistChunks(fp model.Fingerprint, chunks []Chunk) (index int, err error) { f, err := p.openChunkFileForWriting(fp) if err != nil { return -1, err @@ -399,14 +399,14 @@ func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index // incrementally larger indexes. The indexOffset denotes the offset to be added to // each index in indexes. It is the caller's responsibility to not persist or // drop anything for the same fingerprint concurrently. -func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { +func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) { f, err := p.openChunkFileForReading(fp) if err != nil { return nil, err } defer f.Close() - chunks := make([]chunk, 0, len(indexes)) + chunks := make([]Chunk, 0, len(indexes)) buf := p.bufPool.Get().([]byte) defer func() { // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' @@ -436,11 +436,11 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse return nil, err } for c := 0; c < batchSize; c++ { - chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) + chunk, err := NewChunkForEncoding(ChunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) if err != nil { return nil, err } - if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil { + if err := chunk.UnmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil { return nil, err } chunks = append(chunks, chunk) @@ -455,7 +455,7 @@ func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffse // the number of chunkDescs to skip from the end of the series file. It is the // caller's responsibility to not persist or drop anything for the same // fingerprint concurrently. -func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { +func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) { f, err := p.openChunkFileForReading(fp) if os.IsNotExist(err) { return nil, nil @@ -478,7 +478,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ } numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd - cds := make([]*chunkDesc, numChunks) + cds := make([]*ChunkDesc, numChunks) chunkTimesBuf := make([]byte, 16) for i := 0; i < numChunks; i++ { _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) @@ -490,7 +490,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ if err != nil { return nil, err } - cds[i] = &chunkDesc{ + cds[i] = &ChunkDesc{ chunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)), chunkLastTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])), } @@ -542,7 +542,7 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([ // (4.8.1.2) The varint-encoded last time. // // (4.8.2.1) A byte defining the chunk type. -// (4.8.2.2) The chunk itself, marshaled with the marshal() method. +// (4.8.2.2) The chunk itself, marshaled with the Marshal() method. // func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) { log.Info("Checkpointing in-memory metrics and chunks...") @@ -657,10 +657,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap } } else { // This is a non-persisted chunk. Fully marshal it. - if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil { + if err = w.WriteByte(byte(chunkDesc.c.Encoding())); err != nil { return } - if err = chunkDesc.c.marshal(w); err != nil { + if err = chunkDesc.c.Marshal(w); err != nil { return } } @@ -751,7 +751,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in // Returning an error signals problems with the series file. In this case, the // caller should quarantine the series. func (p *persistence) dropAndPersistChunks( - fp model.Fingerprint, beforeTime model.Time, chunks []chunk, + fp model.Fingerprint, beforeTime model.Time, chunks []Chunk, ) ( firstTimeNotDropped model.Time, offset int, @@ -769,7 +769,7 @@ func (p *persistence) dropAndPersistChunks( i := 0 for ; i < len(chunks); i++ { var lt model.Time - lt, err = chunks[i].newIterator().lastTimestamp() + lt, err = chunks[i].NewIterator().LastTimestamp() if err != nil { return } @@ -778,7 +778,7 @@ func (p *persistence) dropAndPersistChunks( } } if i < len(chunks) { - firstTimeNotDropped = chunks[i].firstTime() + firstTimeNotDropped = chunks[i].FirstTime() } if i > 0 || firstTimeNotDropped.Before(beforeTime) { // Series file has to go. @@ -1500,7 +1500,7 @@ func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) { return fpm, highestMappedFP, nil } -func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error { +func (p *persistence) writeChunks(w io.Writer, chunks []Chunk) error { b := p.bufPool.Get().([]byte) defer func() { // buf may change below. An unwrapped 'defer p.bufPool.Put(buf)' @@ -1522,7 +1522,7 @@ func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error { if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil { return err } - if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { + if err := chunk.MarshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil { return err } } @@ -1547,13 +1547,13 @@ func chunkIndexForOffset(offset int64) (int, error) { return int(offset) / chunkLenWithHeader, nil } -func writeChunkHeader(header []byte, c chunk) error { - header[chunkHeaderTypeOffset] = byte(c.encoding()) +func writeChunkHeader(header []byte, c Chunk) error { + header[chunkHeaderTypeOffset] = byte(c.Encoding()) binary.LittleEndian.PutUint64( header[chunkHeaderFirstTimeOffset:], - uint64(c.firstTime()), + uint64(c.FirstTime()), ) - lt, err := c.newIterator().lastTimestamp() + lt, err := c.NewIterator().LastTimestamp() if err != nil { return err } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index d7141b059a..51bb6a2d57 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -38,7 +38,7 @@ var ( m5 = model.Metric{"label": "value5"} ) -func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) { +func newTestPersistence(t *testing.T, encoding ChunkEncoding) (*persistence, testutil.Closer) { DefaultChunkEncoding = encoding dir := testutil.NewTemporaryDirectory("test_persistence", t) p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1) @@ -53,22 +53,22 @@ func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, tes }) } -func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint][]chunk { +func buildTestChunks(t *testing.T, encoding ChunkEncoding) map[model.Fingerprint][]Chunk { fps := model.Fingerprints{ m1.FastFingerprint(), m2.FastFingerprint(), m3.FastFingerprint(), } - fpToChunks := map[model.Fingerprint][]chunk{} + fpToChunks := map[model.Fingerprint][]Chunk{} for _, fp := range fps { - fpToChunks[fp] = make([]chunk, 0, 10) + fpToChunks[fp] = make([]Chunk, 0, 10) for i := 0; i < 10; i++ { - ch, err := newChunkForEncoding(encoding) + ch, err := NewChunkForEncoding(encoding) if err != nil { t.Fatal(err) } - chs, err := ch.add(model.SamplePair{ + chs, err := ch.Add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(fp), }) @@ -81,18 +81,18 @@ func buildTestChunks(t *testing.T, encoding chunkEncoding) map[model.Fingerprint return fpToChunks } -func chunksEqual(c1, c2 chunk) bool { - it1 := c1.newIterator() - it2 := c2.newIterator() - for it1.scan() && it2.scan() { - if !(it1.value() == it2.value()) { +func chunksEqual(c1, c2 Chunk) bool { + it1 := c1.NewIterator() + it2 := c2.NewIterator() + for it1.Scan() && it2.Scan() { + if !(it1.Value() == it2.Value()) { return false } } - return it1.err() == nil && it2.err() == nil + return it1.Err() == nil && it2.Err() == nil } -func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) { +func testPersistLoadDropChunks(t *testing.T, encoding ChunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -450,7 +450,7 @@ func TestPersistLoadDropChunksType1(t *testing.T) { testPersistLoadDropChunks(t, 1) } -func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding) { +func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding ChunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -461,16 +461,16 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding s3, _ := newMemorySeries(m3, nil, time.Time{}) s4, _ := newMemorySeries(m4, nil, time.Time{}) s5, _ := newMemorySeries(m5, nil, time.Time{}) - s1.add(model.SamplePair{Timestamp: 1, Value: 3.14}) - s3.add(model.SamplePair{Timestamp: 2, Value: 2.7}) + s1.Add(model.SamplePair{Timestamp: 1, Value: 3.14}) + s3.Add(model.SamplePair{Timestamp: 2, Value: 2.7}) s3.headChunkClosed = true s3.persistWatermark = 1 for i := 0; i < 10000; i++ { - s4.add(model.SamplePair{ + s4.Add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i) / 2, }) - s5.add(model.SamplePair{ + s5.Add(model.SamplePair{ Timestamp: model.Time(i), Value: model.SampleValue(i * i), }) @@ -562,10 +562,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding t.Error("headChunkClosed is true") } for i, cd := range loadedS4.chunkDescs { - if cd.chunkFirstTime != cd.c.firstTime() { + if cd.chunkFirstTime != cd.c.FirstTime() { t.Errorf( "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", - i, cd.c.firstTime(), cd.chunkFirstTime, + i, cd.c.FirstTime(), cd.chunkFirstTime, ) } if i == len(loadedS4.chunkDescs)-1 { @@ -575,7 +575,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - lastTime, err := cd.c.newIterator().lastTimestamp() + lastTime, err := cd.c.NewIterator().LastTimestamp() if err != nil { t.Fatal(err) } @@ -619,10 +619,10 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - if cd.chunkFirstTime != cd.c.firstTime() { + if cd.chunkFirstTime != cd.c.FirstTime() { t.Errorf( "chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d", - i, cd.c.firstTime(), cd.chunkFirstTime, + i, cd.c.FirstTime(), cd.chunkFirstTime, ) } if i == len(loadedS5.chunkDescs)-1 { @@ -632,7 +632,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding } continue } - lastTime, err := cd.c.newIterator().lastTimestamp() + lastTime, err := cd.c.NewIterator().LastTimestamp() if err != nil { t.Fatal(err) } @@ -690,7 +690,7 @@ func TestCheckpointAndLoadFPMappings(t *testing.T) { } } -func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { +func testFingerprintsModifiedBefore(t *testing.T, encoding ChunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -769,7 +769,7 @@ func TestFingerprintsModifiedBeforeChunkType2(t *testing.T) { testFingerprintsModifiedBefore(t, 2) } -func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) { +func testDropArchivedMetric(t *testing.T, encoding ChunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() @@ -843,7 +843,7 @@ type incrementalBatch struct { expectedLpToFps index.LabelPairFingerprintsMapping } -func testIndexing(t *testing.T, encoding chunkEncoding) { +func testIndexing(t *testing.T, encoding ChunkEncoding) { batches := []incrementalBatch{ { fpToMetric: index.FingerprintMetricMapping{ diff --git a/storage/local/series.go b/storage/local/series.go index 35b1f1a369..bbc7fbce7d 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -140,7 +140,7 @@ func (sm *seriesMap) fpIter() <-chan model.Fingerprint { type memorySeries struct { metric model.Metric // Sorted by start time, overlapping chunk ranges are forbidden. - chunkDescs []*chunkDesc + chunkDescs []*ChunkDesc // The index (within chunkDescs above) of the first chunkDesc that // points to a non-persisted chunk. If all chunks are persisted, then // persistWatermark == len(chunkDescs). @@ -193,7 +193,7 @@ type memorySeries struct { // set to model.Earliest. The zero value for modTime can be used if the // modification time of the series file is unknown (e.g. if this is a genuinely // new series). -func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) (*memorySeries, error) { +func newMemorySeries(m model.Metric, chunkDescs []*ChunkDesc, modTime time.Time) (*memorySeries, error) { var err error firstTime := model.Earliest lastTime := model.Earliest @@ -218,9 +218,9 @@ func newMemorySeries(m model.Metric, chunkDescs []*chunkDesc, modTime time.Time) // completed chunks (which are now eligible for persistence). // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(v model.SamplePair) (int, error) { +func (s *memorySeries) Add(v model.SamplePair) (int, error) { if len(s.chunkDescs) == 0 || s.headChunkClosed { - newHead := newChunkDesc(newChunk(), v.Timestamp) + newHead := NewChunkDesc(NewChunk(), v.Timestamp) s.chunkDescs = append(s.chunkDescs, newHead) s.headChunkClosed = false } else if s.headChunkUsedByIterator && s.head().refCount() > 1 { @@ -236,18 +236,18 @@ func (s *memorySeries) add(v model.SamplePair) (int, error) { chunkOps.WithLabelValues(clone).Inc() // No locking needed here because a non-persisted head chunk can // not get evicted concurrently. - s.head().c = s.head().c.clone() + s.head().c = s.head().c.Clone() s.headChunkUsedByIterator = false } - chunks, err := s.head().add(v) + chunks, err := s.head().Add(v) if err != nil { return 0, err } s.head().c = chunks[0] for _, c := range chunks[1:] { - s.chunkDescs = append(s.chunkDescs, newChunkDesc(c, c.firstTime())) + s.chunkDescs = append(s.chunkDescs, NewChunkDesc(c, c.FirstTime())) } // Populate lastTime of now-closed chunks. @@ -287,14 +287,14 @@ func (s *memorySeries) maybeCloseHeadChunk() bool { func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) { lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted) if lenToKeep < len(s.chunkDescs) { - s.savedFirstTime = s.firstTime() + s.savedFirstTime = s.FirstTime() lenEvicted := len(s.chunkDescs) - lenToKeep s.chunkDescsOffset += lenEvicted s.persistWatermark -= lenEvicted chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted)) numMemChunkDescs.Sub(float64(lenEvicted)) s.chunkDescs = append( - make([]*chunkDesc, 0, lenToKeep), + make([]*ChunkDesc, 0, lenToKeep), s.chunkDescs[lenEvicted:]..., ) s.dirty = true @@ -324,7 +324,7 @@ func (s *memorySeries) dropChunks(t model.Time) error { return nil } s.chunkDescs = append( - make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), + make([]*ChunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]..., ) s.persistWatermark -= keepIdx @@ -344,7 +344,7 @@ func (s *memorySeries) preloadChunks( indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage, ) (SeriesIterator, error) { loadIndexes := []int{} - pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) + pinnedChunkDescs := make([]*ChunkDesc, 0, len(indexes)) for _, idx := range indexes { cd := s.chunkDescs[idx] pinnedChunkDescs = append(pinnedChunkDescs, cd) @@ -382,23 +382,23 @@ func (s *memorySeries) preloadChunks( } iter := &boundedIterator{ - it: s.newIterator(pinnedChunkDescs, curriedQuarantineSeries, mss.evictRequests), + it: s.NewIterator(pinnedChunkDescs, curriedQuarantineSeries, mss.evictRequests), start: model.Now().Add(-mss.dropAfter), } return iter, nil } -// newIterator returns a new SeriesIterator for the provided chunkDescs (which +// NewIterator( returns a new SeriesIterator for the provided chunkDescs (which // must be pinned). // // The caller must have locked the fingerprint of the memorySeries. -func (s *memorySeries) newIterator( - pinnedChunkDescs []*chunkDesc, +func (s *memorySeries) NewIterator( + pinnedChunkDescs []*ChunkDesc, quarantine func(error), evictRequests chan<- evictRequest, ) SeriesIterator { - chunks := make([]chunk, 0, len(pinnedChunkDescs)) + chunks := make([]Chunk, 0, len(pinnedChunkDescs)) for _, cd := range pinnedChunkDescs { // It's OK to directly access cd.c here (without locking) as the // series FP is locked and the chunk is pinned. @@ -406,7 +406,7 @@ func (s *memorySeries) newIterator( } return &memorySeriesIterator{ chunks: chunks, - chunkIts: make([]chunkIterator, len(chunks)), + chunkIts: make([]ChunkIterator, len(chunks)), quarantine: quarantine, metric: s.metric, pinnedChunkDescs: pinnedChunkDescs, @@ -506,14 +506,14 @@ func (s *memorySeries) preloadChunksForRange( // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *memorySeries) head() *chunkDesc { +func (s *memorySeries) head() *ChunkDesc { return s.chunkDescs[len(s.chunkDescs)-1] } // firstTime returns the timestamp of the first sample in the series. // // The caller must have locked the fingerprint of the memorySeries. -func (s *memorySeries) firstTime() model.Time { +func (s *memorySeries) FirstTime() model.Time { if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 { return s.chunkDescs[0].firstTime() } @@ -543,7 +543,7 @@ func (s *memorySeries) lastSamplePair() model.SamplePair { // accordingly. // // The caller must have locked the fingerprint of the series. -func (s *memorySeries) chunksToPersist() []*chunkDesc { +func (s *memorySeries) chunksToPersist() []*ChunkDesc { newWatermark := len(s.chunkDescs) if !s.headChunkClosed { newWatermark-- @@ -560,17 +560,17 @@ func (s *memorySeries) chunksToPersist() []*chunkDesc { // memorySeriesIterator implements SeriesIterator. type memorySeriesIterator struct { // Last chunkIterator used by ValueAtOrBeforeTime. - chunkIt chunkIterator + chunkIt ChunkIterator // Caches chunkIterators. - chunkIts []chunkIterator + chunkIts []ChunkIterator // The actual sample chunks. - chunks []chunk + chunks []Chunk // Call to quarantine the series this iterator belongs to. quarantine func(error) // The metric corresponding to the iterator. metric model.Metric // Chunks that were pinned for this iterator. - pinnedChunkDescs []*chunkDesc + pinnedChunkDescs []*ChunkDesc // Where to send evict requests when unpinning pinned chunks. evictRequests chan<- evictRequest } @@ -579,17 +579,17 @@ type memorySeriesIterator struct { func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePair { // The most common case. We are iterating through a chunk. if it.chunkIt != nil { - containsT, err := it.chunkIt.contains(t) + containsT, err := it.chunkIt.Contains(t) if err != nil { it.quarantine(err) return ZeroSamplePair } if containsT { - if it.chunkIt.findAtOrBefore(t) { - return it.chunkIt.value() + if it.chunkIt.FindAtOrBefore(t) { + return it.chunkIt.Value() } - if it.chunkIt.err() != nil { - it.quarantine(it.chunkIt.err()) + if it.chunkIt.Err() != nil { + it.quarantine(it.chunkIt.Err()) } return ZeroSamplePair } @@ -599,21 +599,21 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa return ZeroSamplePair } - // Find the last chunk where firstTime() is before or equal to t. + // Find the last chunk where FirstTime() is before or equal to t. l := len(it.chunks) - 1 i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[l-i].firstTime().After(t) + return !it.chunks[l-i].FirstTime().After(t) }) if i == len(it.chunks) { // Even the first chunk starts after t. return ZeroSamplePair } it.chunkIt = it.chunkIterator(l - i) - if it.chunkIt.findAtOrBefore(t) { - return it.chunkIt.value() + if it.chunkIt.FindAtOrBefore(t) { + return it.chunkIt.Value() } - if it.chunkIt.err() != nil { - it.quarantine(it.chunkIt.err()) + if it.chunkIt.Err() != nil { + it.quarantine(it.chunkIt.Err()) } return ZeroSamplePair } @@ -622,12 +622,12 @@ func (it *memorySeriesIterator) ValueAtOrBeforeTime(t model.Time) model.SamplePa func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePair { // Find the first chunk for which the first sample is within the interval. i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].firstTime().Before(in.OldestInclusive) + return !it.chunks[i].FirstTime().Before(in.OldestInclusive) }) // Only now check the last timestamp of the previous chunk (which is // fairly expensive). if i > 0 { - lt, err := it.chunkIterator(i - 1).lastTimestamp() + lt, err := it.chunkIterator(i - 1).LastTimestamp() if err != nil { it.quarantine(err) return nil @@ -639,7 +639,7 @@ func (it *memorySeriesIterator) RangeValues(in metric.Interval) []model.SamplePa values := []model.SamplePair{} for j, c := range it.chunks[i:] { - if c.firstTime().After(in.NewestInclusive) { + if c.FirstTime().After(in.NewestInclusive) { break } chValues, err := rangeValues(it.chunkIterator(i+j), in) @@ -658,10 +658,10 @@ func (it *memorySeriesIterator) Metric() metric.Metric { // chunkIterator returns the chunkIterator for the chunk at position i (and // creates it if needed). -func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator { +func (it *memorySeriesIterator) chunkIterator(i int) ChunkIterator { chunkIt := it.chunkIts[i] if chunkIt == nil { - chunkIt = it.chunks[i].newIterator() + chunkIt = it.chunks[i].NewIterator() it.chunkIts[i] = chunkIt } return chunkIt diff --git a/storage/local/series_test.go b/storage/local/series_test.go index 86467c2702..e36be82f83 100644 --- a/storage/local/series_test.go +++ b/storage/local/series_test.go @@ -26,11 +26,11 @@ func TestDropChunks(t *testing.T) { t.Fatal(err) } - s.add(model.SamplePair{ + s.Add(model.SamplePair{ Timestamp: 100, Value: 42, }) - s.add(model.SamplePair{ + s.Add(model.SamplePair{ Timestamp: 110, Value: 4711, }) diff --git a/storage/local/storage.go b/storage/local/storage.go index a00da8b8ae..2a1efca657 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -90,7 +90,7 @@ var ( ) type evictRequest struct { - cd *chunkDesc + cd *ChunkDesc evict bool } @@ -662,7 +662,7 @@ func (s *MemorySeriesStorage) metricForRange( ) (model.Metric, *memorySeries, bool) { series, ok := s.fpToSeries.get(fp) if ok { - if series.lastTime.Before(from) || series.firstTime().After(through) { + if series.lastTime.Before(from) || series.FirstTime().After(through) { return nil, nil, false } return series.metric, series, true @@ -762,7 +762,7 @@ func (s *MemorySeriesStorage) Append(sample *model.Sample) error { s.discardedSamplesCount.WithLabelValues(outOfOrderTimestamp).Inc() return ErrOutOfOrderSample // Caused by the caller. } - completedChunksCount, err := series.add(model.SamplePair{ + completedChunksCount, err := series.Add(model.SamplePair{ Value: sample.Value, Timestamp: sample.Timestamp, }) @@ -833,7 +833,7 @@ func (s *MemorySeriesStorage) logThrottling() { func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { series, ok := s.fpToSeries.get(fp) if !ok { - var cds []*chunkDesc + var cds []*ChunkDesc var modTime time.Time unarchived, err := s.persistence.unarchiveMetric(fp) if err != nil { @@ -975,13 +975,13 @@ func (s *MemorySeriesStorage) maybeEvict() { if numChunksToEvict <= 0 { return } - chunkDescsToEvict := make([]*chunkDesc, numChunksToEvict) + chunkDescsToEvict := make([]*ChunkDesc, numChunksToEvict) for i := range chunkDescsToEvict { e := s.evictList.Front() if e == nil { break } - cd := e.Value.(*chunkDesc) + cd := e.Value.(*ChunkDesc) cd.evictListElement = nil chunkDescsToEvict[i] = cd s.evictList.Remove(e) @@ -1269,7 +1269,7 @@ func (s *MemorySeriesStorage) maintainMemorySeries( if iOldestNotEvicted == -1 && model.Now().Sub(series.lastTime) > headChunkTimeout { s.fpToSeries.del(fp) s.numSeries.Dec() - s.persistence.archiveMetric(fp, series.metric, series.firstTime(), series.lastTime) + s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), series.lastTime) s.seriesOps.WithLabelValues(archive).Inc() oldWatermark := atomic.LoadInt64((*int64)(&s.archiveHighWatermark)) if oldWatermark < int64(series.lastTime) { @@ -1325,12 +1325,12 @@ func (s *MemorySeriesStorage) writeMemorySeries( // Get the actual chunks from underneath the chunkDescs. // No lock required as chunks still to persist cannot be evicted. - chunks := make([]chunk, len(cds)) + chunks := make([]Chunk, len(cds)) for i, cd := range cds { chunks[i] = cd.c } - if !series.firstTime().Before(beforeTime) { + if !series.FirstTime().Before(beforeTime) { // Oldest sample not old enough, just append chunks, if any. if len(cds) == 0 { return false @@ -1413,12 +1413,12 @@ func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor } // See persistence.loadChunks for detailed explanation. -func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { +func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]Chunk, error) { return s.persistence.loadChunks(fp, indexes, indexOffset) } // See persistence.loadChunkDescs for detailed explanation. -func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { +func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*ChunkDesc, error) { return s.persistence.loadChunkDescs(fp, offsetFromEnd) } diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index cd9e866f50..a52689f705 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -68,7 +68,7 @@ func TestMatches(t *testing.T) { t.Fatal("could not retrieve series for fp", fp) } storage.fpLocker.Lock(fp) - storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime) + storage.persistence.archiveMetric(fp, s.metric, s.FirstTime(), s.lastTime) storage.fpLocker.Unlock(fp) } @@ -785,7 +785,7 @@ func TestLoop(t *testing.T) { } } -func testChunk(t *testing.T, encoding chunkEncoding) { +func testChunk(t *testing.T, encoding ChunkEncoding) { samples := make(model.Samples, 500000) for i := range samples { samples[i] = &model.Sample{ @@ -809,12 +809,12 @@ func testChunk(t *testing.T, encoding chunkEncoding) { if cd.isEvicted() { continue } - it := cd.c.newIterator() - for it.scan() { - values = append(values, it.value()) + it := cd.c.NewIterator() + for it.Scan() { + values = append(values, it.Value()) } - if it.err() != nil { - t.Error(it.err()) + if it.Err() != nil { + t.Error(it.Err()) } } @@ -843,7 +843,7 @@ func TestChunkType2(t *testing.T) { testChunk(t, 2) } -func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) { +func testValueAtOrBeforeTime(t *testing.T, encoding ChunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -921,7 +921,7 @@ func TestValueAtTimeChunkType2(t *testing.T) { testValueAtOrBeforeTime(t, 2) } -func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) { +func benchmarkValueAtOrBeforeTime(b *testing.B, encoding ChunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1003,7 +1003,7 @@ func BenchmarkValueAtTimeChunkType2(b *testing.B) { benchmarkValueAtOrBeforeTime(b, 2) } -func testRangeValues(t *testing.T, encoding chunkEncoding) { +func testRangeValues(t *testing.T, encoding ChunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1159,7 +1159,7 @@ func TestRangeValuesChunkType2(t *testing.T) { testRangeValues(t, 2) } -func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) { +func benchmarkRangeValues(b *testing.B, encoding ChunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1207,7 +1207,7 @@ func BenchmarkRangeValuesChunkType2(b *testing.B) { benchmarkRangeValues(b, 2) } -func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { +func testEvictAndPurgeSeries(t *testing.T, encoding ChunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1275,7 +1275,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime) archived, _, _ := s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") @@ -1316,7 +1316,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { if err != nil { t.Fatal(err) } - s.persistence.archiveMetric(fp, series.metric, series.firstTime(), lastTime) + s.persistence.archiveMetric(fp, series.metric, series.FirstTime(), lastTime) archived, _, _ = s.persistence.hasArchivedMetric(fp) if !archived { t.Fatal("not archived") @@ -1362,7 +1362,7 @@ func TestEvictAndPurgeSeriesChunkType2(t *testing.T) { testEvictAndPurgeSeries(t, 2) } -func testEvictAndLoadChunkDescs(t *testing.T, encoding chunkEncoding) { +func testEvictAndLoadChunkDescs(t *testing.T, encoding ChunkEncoding) { samples := make(model.Samples, 10000) for i := range samples { samples[i] = &model.Sample{ @@ -1433,7 +1433,7 @@ func TestEvictAndLoadChunkDescsType1(t *testing.T) { testEvictAndLoadChunkDescs(t, 1) } -func benchmarkAppend(b *testing.B, encoding chunkEncoding) { +func benchmarkAppend(b *testing.B, encoding ChunkEncoding) { samples := make(model.Samples, b.N) for i := range samples { samples[i] = &model.Sample{ @@ -1469,7 +1469,7 @@ func BenchmarkAppendType2(b *testing.B) { // Append a large number of random samples and then check if we can get them out // of the storage alright. -func testFuzz(t *testing.T, encoding chunkEncoding) { +func testFuzz(t *testing.T, encoding ChunkEncoding) { if testing.Short() { t.Skip("Skipping test in short mode.") } @@ -1517,7 +1517,7 @@ func TestFuzzChunkType2(t *testing.T) { // make things even slower): // // go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType -func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { +func benchmarkFuzz(b *testing.B, encoding ChunkEncoding) { DefaultChunkEncoding = encoding const samplesPerRun = 100000 rand.Seed(42) diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 7c2a3a8ee1..48beef0cda 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -40,7 +40,7 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t testutil.T, encoding chunkEncoding) (*MemorySeriesStorage, testutil.Closer) { +func NewTestStorage(t testutil.T, encoding ChunkEncoding) (*MemorySeriesStorage, testutil.Closer) { DefaultChunkEncoding = encoding directory := testutil.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ diff --git a/storage/local/varbit.go b/storage/local/varbit.go index cdc13936b1..3c909210a1 100644 --- a/storage/local/varbit.go +++ b/storage/local/varbit.go @@ -257,7 +257,7 @@ func newVarbitChunk(enc varbitValueEncoding) *varbitChunk { } // add implements chunk. -func (c *varbitChunk) add(s model.SamplePair) ([]chunk, error) { +func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) { offset := c.nextSampleOffset() switch { case c.closed(): @@ -273,19 +273,19 @@ func (c *varbitChunk) add(s model.SamplePair) ([]chunk, error) { } // clone implements chunk. -func (c varbitChunk) clone() chunk { +func (c varbitChunk) Clone() Chunk { clone := make(varbitChunk, len(c)) copy(clone, c) return &clone } -// newIterator implements chunk. -func (c varbitChunk) newIterator() chunkIterator { +// NewIterator implements chunk. +func (c varbitChunk) NewIterator() ChunkIterator { return newVarbitChunkIterator(c) } // marshal implements chunk. -func (c varbitChunk) marshal(w io.Writer) error { +func (c varbitChunk) Marshal(w io.Writer) error { n, err := w.Write(c) if err != nil { return err @@ -297,7 +297,7 @@ func (c varbitChunk) marshal(w io.Writer) error { } // marshalToBuf implements chunk. -func (c varbitChunk) marshalToBuf(buf []byte) error { +func (c varbitChunk) MarshalToBuf(buf []byte) error { n := copy(buf, c) if n != len(c) { return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n) @@ -306,13 +306,13 @@ func (c varbitChunk) marshalToBuf(buf []byte) error { } // unmarshal implements chunk. -func (c varbitChunk) unmarshal(r io.Reader) error { +func (c varbitChunk) Unmarshal(r io.Reader) error { _, err := io.ReadFull(r, c) return err } // unmarshalFromBuf implements chunk. -func (c varbitChunk) unmarshalFromBuf(buf []byte) error { +func (c varbitChunk) UnmarshalFromBuf(buf []byte) error { if copied := copy(c, buf); copied != cap(c) { return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied) } @@ -320,10 +320,10 @@ func (c varbitChunk) unmarshalFromBuf(buf []byte) error { } // encoding implements chunk. -func (c varbitChunk) encoding() chunkEncoding { return varbit } +func (c varbitChunk) Encoding() ChunkEncoding { return Varbit } // firstTime implements chunk. -func (c varbitChunk) firstTime() model.Time { +func (c varbitChunk) FirstTime() model.Time { return model.Time( binary.BigEndian.Uint64( c[varbitFirstTimeOffset:], @@ -472,7 +472,7 @@ func (c varbitChunk) setLastSample(s model.SamplePair) { // addFirstSample is a helper method only used by c.add(). It adds timestamp and // value as base time and value. -func (c *varbitChunk) addFirstSample(s model.SamplePair) []chunk { +func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk { binary.BigEndian.PutUint64( (*c)[varbitFirstTimeOffset:], uint64(s.Timestamp), @@ -483,14 +483,14 @@ func (c *varbitChunk) addFirstSample(s model.SamplePair) []chunk { ) c.setLastSample(s) // To simplify handling of single-sample chunks. c.setNextSampleOffset(varbitSecondSampleBitOffset) - return []chunk{c} + return []Chunk{c} } // addSecondSample is a helper method only used by c.add(). It calculates the // first time delta from the provided sample and adds it to the chunk together // with the provided sample as the last sample. -func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) { - firstTimeDelta := s.Timestamp - c.firstTime() +func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) { + firstTimeDelta := s.Timestamp - c.FirstTime() if firstTimeDelta < 0 { return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta) } @@ -509,7 +509,7 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) { c.setLastSample(s) c.setNextSampleOffset(varbitThirdSampleBitOffset) - return []chunk{c}, nil + return []Chunk{c}, nil } // addLastSample isa a helper method only used by c.add() and in other helper @@ -518,15 +518,15 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]chunk, error) { // adds the very last sample added to this chunk ever, while setLastSample sets // the sample most recently added to the chunk so that it can be used for the // calculations required to add the next sample. -func (c *varbitChunk) addLastSample(s model.SamplePair) []chunk { +func (c *varbitChunk) addLastSample(s model.SamplePair) []Chunk { c.setLastSample(s) (*c)[varbitFlagOffset] |= 0x80 - return []chunk{c} + return []Chunk{c} } // addLaterSample is a helper method only used by c.add(). It adds a third or // later sample. -func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) { +func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk, error) { var ( lastTime = c.lastTime() lastTimeDelta = c.lastTimeDelta() @@ -593,7 +593,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk c.setNextSampleOffset(offset) c.setLastSample(s) - return []chunk{c}, nil + return []Chunk{c}, nil } func (c varbitChunk) prepForThirdSample( @@ -904,7 +904,7 @@ func newVarbitChunkIterator(c varbitChunk) *varbitChunkIterator { } // lastTimestamp implements chunkIterator. -func (it *varbitChunkIterator) lastTimestamp() (model.Time, error) { +func (it *varbitChunkIterator) LastTimestamp() (model.Time, error) { if it.len == varbitFirstSampleBitOffset { // No samples in the chunk yet. return model.Earliest, it.lastError @@ -913,18 +913,18 @@ func (it *varbitChunkIterator) lastTimestamp() (model.Time, error) { } // contains implements chunkIterator. -func (it *varbitChunkIterator) contains(t model.Time) (bool, error) { - last, err := it.lastTimestamp() +func (it *varbitChunkIterator) Contains(t model.Time) (bool, error) { + last, err := it.LastTimestamp() if err != nil { it.lastError = err return false, err } - return !t.Before(it.c.firstTime()) && + return !t.Before(it.c.FirstTime()) && !t.After(last), it.lastError } // scan implements chunkIterator. -func (it *varbitChunkIterator) scan() bool { +func (it *varbitChunkIterator) Scan() bool { if it.lastError != nil { return false } @@ -947,7 +947,7 @@ func (it *varbitChunkIterator) scan() bool { return it.lastError == nil } if it.pos == varbitFirstSampleBitOffset { - it.t = it.c.firstTime() + it.t = it.c.FirstTime() it.v = it.c.firstValue() it.pos = varbitSecondSampleBitOffset return it.lastError == nil @@ -1003,8 +1003,8 @@ func (it *varbitChunkIterator) scan() bool { } // findAtOrBefore implements chunkIterator. -func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool { - if it.len == 0 || t.Before(it.c.firstTime()) { +func (it *varbitChunkIterator) FindAtOrBefore(t model.Time) bool { + if it.len == 0 || t.Before(it.c.FirstTime()) { return false } last := it.c.lastTime() @@ -1025,7 +1025,7 @@ func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool { prevT = model.Earliest prevV model.SampleValue ) - for it.scan() && t.After(it.t) { + for it.Scan() && t.After(it.t) { prevT = it.t prevV = it.v // TODO(beorn7): If we are in a repeat, we could iterate forward @@ -1039,14 +1039,14 @@ func (it *varbitChunkIterator) findAtOrBefore(t model.Time) bool { } // findAtOrAfter implements chunkIterator. -func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool { +func (it *varbitChunkIterator) FindAtOrAfter(t model.Time) bool { if it.len == 0 || t.After(it.c.lastTime()) { return false } - first := it.c.firstTime() + first := it.c.FirstTime() if !t.After(first) { it.reset() - return it.scan() + return it.Scan() } if t == it.t { return it.lastError == nil @@ -1054,7 +1054,7 @@ func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool { if t.Before(it.t) { it.reset() } - for it.scan() && t.After(it.t) { + for it.Scan() && t.After(it.t) { // TODO(beorn7): If we are in a repeat, we could iterate forward // much faster. } @@ -1062,7 +1062,7 @@ func (it *varbitChunkIterator) findAtOrAfter(t model.Time) bool { } // value implements chunkIterator. -func (it *varbitChunkIterator) value() model.SamplePair { +func (it *varbitChunkIterator) Value() model.SamplePair { return model.SamplePair{ Timestamp: it.t, Value: it.v, @@ -1070,7 +1070,7 @@ func (it *varbitChunkIterator) value() model.SamplePair { } // err implements chunkIterator. -func (it *varbitChunkIterator) err() error { +func (it *varbitChunkIterator) Err() error { return it.lastError } diff --git a/storage/remote/remote.go b/storage/remote/remote.go index bdf19f6d6d..784001c1b4 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -89,7 +89,7 @@ type Options struct { GraphitePrefix string } -// Run starts the background processing of the storage queues. +// Start starts the background processing of the storage queues. func (s *Storage) Start() { for _, q := range s.queues { q.Start()