mirror of
				https://github.com/prometheus/prometheus.git
				synced 2025-10-25 14:31:01 +02:00 
			
		
		
		
	- Pick At... method via return value of Next/Seek. - Do not clobber returned buckets. - Add partial FloatHistogram suppert. Note that the promql package is now _only_ dealing with FloatHistograms, following the idea that PromQL only knows float values. As a byproduct, I have removed the histogramSeries metric. In my understanding, series can have both float and histogram samples, so that metric doesn't make sense anymore. As another byproduct, I have converged the sampleBuf and the histogramSampleBuf in memSeries into one. The sample type stored in the sampleBuf has been extended to also contain histograms even before this commit. Signed-off-by: beorn7 <beorn@grafana.com>
		
			
				
	
	
		
			504 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			504 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2017 The Prometheus Authors
 | |
| // Licensed under the Apache License, Version 2.0 (the "License");
 | |
| // you may not use this file except in compliance with the License.
 | |
| // You may obtain a copy of the License at
 | |
| //
 | |
| // http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing, software
 | |
| // distributed under the License is distributed on an "AS IS" BASIS,
 | |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| // See the License for the specific language governing permissions and
 | |
| // limitations under the License.
 | |
| 
 | |
| // The code in this file was largely written by Damian Gryski as part of
 | |
| // https://github.com/dgryski/go-tsz and published under the license below.
 | |
| // It was modified to accommodate reading from byte slices without modifying
 | |
| // the underlying bytes, which would panic when reading from mmap'd
 | |
| // read-only byte slices.
 | |
| 
 | |
| // Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
 | |
| // All rights reserved.
 | |
| 
 | |
| // Redistribution and use in source and binary forms, with or without
 | |
| // modification, are permitted provided that the following conditions are met:
 | |
| 
 | |
| // * Redistributions of source code must retain the above copyright notice,
 | |
| // this list of conditions and the following disclaimer.
 | |
| //
 | |
| // * Redistributions in binary form must reproduce the above copyright notice,
 | |
| // this list of conditions and the following disclaimer in the documentation
 | |
| // and/or other materials provided with the distribution.
 | |
| //
 | |
| // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
 | |
| // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 | |
| // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
 | |
| // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
 | |
| // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 | |
| // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
 | |
| // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
 | |
| // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 | |
| // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | |
| // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | |
| 
 | |
| package chunkenc
 | |
| 
 | |
| import (
 | |
| 	"encoding/binary"
 | |
| 	"math"
 | |
| 	"math/bits"
 | |
| 
 | |
| 	"github.com/prometheus/prometheus/model/histogram"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	chunkCompactCapacityThreshold = 32
 | |
| )
 | |
| 
 | |
| // XORChunk holds XOR encoded sample data.
 | |
| type XORChunk struct {
 | |
| 	b bstream
 | |
| }
 | |
| 
 | |
| // NewXORChunk returns a new chunk with XOR encoding of the given size.
 | |
| func NewXORChunk() *XORChunk {
 | |
| 	b := make([]byte, 2, 128)
 | |
| 	return &XORChunk{b: bstream{stream: b, count: 0}}
 | |
| }
 | |
| 
 | |
| // Encoding returns the encoding type.
 | |
| func (c *XORChunk) Encoding() Encoding {
 | |
| 	return EncXOR
 | |
| }
 | |
| 
 | |
| // Bytes returns the underlying byte slice of the chunk.
 | |
| func (c *XORChunk) Bytes() []byte {
 | |
| 	return c.b.bytes()
 | |
| }
 | |
| 
 | |
| // NumSamples returns the number of samples in the chunk.
 | |
| func (c *XORChunk) NumSamples() int {
 | |
| 	return int(binary.BigEndian.Uint16(c.Bytes()))
 | |
| }
 | |
| 
 | |
| // Compact implements the Chunk interface.
 | |
| func (c *XORChunk) Compact() {
 | |
| 	if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold {
 | |
| 		buf := make([]byte, l)
 | |
| 		copy(buf, c.b.stream)
 | |
| 		c.b.stream = buf
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Appender implements the Chunk interface.
 | |
| func (c *XORChunk) Appender() (Appender, error) {
 | |
| 	it := c.iterator(nil)
 | |
| 
 | |
| 	// To get an appender we must know the state it would have if we had
 | |
| 	// appended all existing data from scratch.
 | |
| 	// We iterate through the end and populate via the iterator's state.
 | |
| 	for it.Next() != ValNone {
 | |
| 	}
 | |
| 	if err := it.Err(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	a := &xorAppender{
 | |
| 		b:        &c.b,
 | |
| 		t:        it.t,
 | |
| 		v:        it.val,
 | |
| 		tDelta:   it.tDelta,
 | |
| 		leading:  it.leading,
 | |
| 		trailing: it.trailing,
 | |
| 	}
 | |
| 	if it.numTotal == 0 {
 | |
| 		a.leading = 0xff
 | |
| 	}
 | |
| 	return a, nil
 | |
| }
 | |
| 
 | |
| func (c *XORChunk) iterator(it Iterator) *xorIterator {
 | |
| 	// Should iterators guarantee to act on a copy of the data so it doesn't lock append?
 | |
| 	// When using striped locks to guard access to chunks, probably yes.
 | |
| 	// Could only copy data if the chunk is not completed yet.
 | |
| 	if xorIter, ok := it.(*xorIterator); ok {
 | |
| 		xorIter.Reset(c.b.bytes())
 | |
| 		return xorIter
 | |
| 	}
 | |
| 	return &xorIterator{
 | |
| 		// The first 2 bytes contain chunk headers.
 | |
| 		// We skip that for actual samples.
 | |
| 		br:       newBReader(c.b.bytes()[2:]),
 | |
| 		numTotal: binary.BigEndian.Uint16(c.b.bytes()),
 | |
| 		t:        math.MinInt64,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Iterator implements the Chunk interface.
 | |
| func (c *XORChunk) Iterator(it Iterator) Iterator {
 | |
| 	return c.iterator(it)
 | |
| }
 | |
| 
 | |
| type xorAppender struct {
 | |
| 	b *bstream
 | |
| 
 | |
| 	t      int64
 | |
| 	v      float64
 | |
| 	tDelta uint64
 | |
| 
 | |
| 	leading  uint8
 | |
| 	trailing uint8
 | |
| }
 | |
| 
 | |
| func (a *xorAppender) AppendHistogram(t int64, h *histogram.Histogram) {
 | |
| 	panic("appended a histogram to an xor chunk")
 | |
| }
 | |
| 
 | |
| func (a *xorAppender) Append(t int64, v float64) {
 | |
| 	var tDelta uint64
 | |
| 	num := binary.BigEndian.Uint16(a.b.bytes())
 | |
| 
 | |
| 	if num == 0 {
 | |
| 		buf := make([]byte, binary.MaxVarintLen64)
 | |
| 		for _, b := range buf[:binary.PutVarint(buf, t)] {
 | |
| 			a.b.writeByte(b)
 | |
| 		}
 | |
| 		a.b.writeBits(math.Float64bits(v), 64)
 | |
| 
 | |
| 	} else if num == 1 {
 | |
| 		tDelta = uint64(t - a.t)
 | |
| 
 | |
| 		buf := make([]byte, binary.MaxVarintLen64)
 | |
| 		for _, b := range buf[:binary.PutUvarint(buf, tDelta)] {
 | |
| 			a.b.writeByte(b)
 | |
| 		}
 | |
| 
 | |
| 		a.writeVDelta(v)
 | |
| 
 | |
| 	} else {
 | |
| 		tDelta = uint64(t - a.t)
 | |
| 		dod := int64(tDelta - a.tDelta)
 | |
| 
 | |
| 		// Gorilla has a max resolution of seconds, Prometheus milliseconds.
 | |
| 		// Thus we use higher value range steps with larger bit size.
 | |
| 		//
 | |
| 		// TODO(beorn7): This seems to needlessly jump to large bit
 | |
| 		// sizes even for very small deviations from zero. Timestamp
 | |
| 		// compression can probably benefit from some smaller bit
 | |
| 		// buckets. See also what was done for histogram encoding in
 | |
| 		// varbit.go.
 | |
| 		switch {
 | |
| 		case dod == 0:
 | |
| 			a.b.writeBit(zero)
 | |
| 		case bitRange(dod, 14):
 | |
| 			a.b.writeBits(0b10, 2)
 | |
| 			a.b.writeBits(uint64(dod), 14)
 | |
| 		case bitRange(dod, 17):
 | |
| 			a.b.writeBits(0b110, 3)
 | |
| 			a.b.writeBits(uint64(dod), 17)
 | |
| 		case bitRange(dod, 20):
 | |
| 			a.b.writeBits(0b1110, 4)
 | |
| 			a.b.writeBits(uint64(dod), 20)
 | |
| 		default:
 | |
| 			a.b.writeBits(0b1111, 4)
 | |
| 			a.b.writeBits(uint64(dod), 64)
 | |
| 		}
 | |
| 
 | |
| 		a.writeVDelta(v)
 | |
| 	}
 | |
| 
 | |
| 	a.t = t
 | |
| 	a.v = v
 | |
| 	binary.BigEndian.PutUint16(a.b.bytes(), num+1)
 | |
| 	a.tDelta = tDelta
 | |
| }
 | |
| 
 | |
| // bitRange returns whether the given integer can be represented by nbits.
 | |
| // See docs/bstream.md.
 | |
| func bitRange(x int64, nbits uint8) bool {
 | |
| 	return -((1<<(nbits-1))-1) <= x && x <= 1<<(nbits-1)
 | |
| }
 | |
| 
 | |
| func (a *xorAppender) writeVDelta(v float64) {
 | |
| 	a.leading, a.trailing = xorWrite(a.b, v, a.v, a.leading, a.trailing)
 | |
| }
 | |
| 
 | |
| type xorIterator struct {
 | |
| 	br       bstreamReader
 | |
| 	numTotal uint16
 | |
| 	numRead  uint16
 | |
| 
 | |
| 	t   int64
 | |
| 	val float64
 | |
| 
 | |
| 	leading  uint8
 | |
| 	trailing uint8
 | |
| 
 | |
| 	tDelta uint64
 | |
| 	err    error
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) Seek(t int64) ValueType {
 | |
| 	if it.err != nil {
 | |
| 		return ValNone
 | |
| 	}
 | |
| 
 | |
| 	for t > it.t || it.numRead == 0 {
 | |
| 		if it.Next() == ValNone {
 | |
| 			return ValNone
 | |
| 		}
 | |
| 	}
 | |
| 	return ValFloat
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) At() (int64, float64) {
 | |
| 	return it.t, it.val
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) AtHistogram() (int64, *histogram.Histogram) {
 | |
| 	panic("cannot call xorIterator.AtHistogram")
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
 | |
| 	panic("cannot call xorIterator.AtFloatHistogram")
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) AtT() int64 {
 | |
| 	return it.t
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) Err() error {
 | |
| 	return it.err
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) Reset(b []byte) {
 | |
| 	// The first 2 bytes contain chunk headers.
 | |
| 	// We skip that for actual samples.
 | |
| 	it.br = newBReader(b[2:])
 | |
| 	it.numTotal = binary.BigEndian.Uint16(b)
 | |
| 
 | |
| 	it.numRead = 0
 | |
| 	it.t = 0
 | |
| 	it.val = 0
 | |
| 	it.leading = 0
 | |
| 	it.trailing = 0
 | |
| 	it.tDelta = 0
 | |
| 	it.err = nil
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) Next() ValueType {
 | |
| 	if it.err != nil || it.numRead == it.numTotal {
 | |
| 		return ValNone
 | |
| 	}
 | |
| 
 | |
| 	if it.numRead == 0 {
 | |
| 		t, err := binary.ReadVarint(&it.br)
 | |
| 		if err != nil {
 | |
| 			it.err = err
 | |
| 			return ValNone
 | |
| 		}
 | |
| 		v, err := it.br.readBits(64)
 | |
| 		if err != nil {
 | |
| 			it.err = err
 | |
| 			return ValNone
 | |
| 		}
 | |
| 		it.t = t
 | |
| 		it.val = math.Float64frombits(v)
 | |
| 
 | |
| 		it.numRead++
 | |
| 		return ValFloat
 | |
| 	}
 | |
| 	if it.numRead == 1 {
 | |
| 		tDelta, err := binary.ReadUvarint(&it.br)
 | |
| 		if err != nil {
 | |
| 			it.err = err
 | |
| 			return ValNone
 | |
| 		}
 | |
| 		it.tDelta = tDelta
 | |
| 		it.t = it.t + int64(it.tDelta)
 | |
| 
 | |
| 		return it.readValue()
 | |
| 	}
 | |
| 
 | |
| 	var d byte
 | |
| 	// read delta-of-delta
 | |
| 	for i := 0; i < 4; i++ {
 | |
| 		d <<= 1
 | |
| 		bit, err := it.br.readBitFast()
 | |
| 		if err != nil {
 | |
| 			bit, err = it.br.readBit()
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			it.err = err
 | |
| 			return ValNone
 | |
| 		}
 | |
| 		if bit == zero {
 | |
| 			break
 | |
| 		}
 | |
| 		d |= 1
 | |
| 	}
 | |
| 	var sz uint8
 | |
| 	var dod int64
 | |
| 	switch d {
 | |
| 	case 0b0:
 | |
| 		// dod == 0
 | |
| 	case 0b10:
 | |
| 		sz = 14
 | |
| 	case 0b110:
 | |
| 		sz = 17
 | |
| 	case 0b1110:
 | |
| 		sz = 20
 | |
| 	case 0b1111:
 | |
| 		// Do not use fast because it's very unlikely it will succeed.
 | |
| 		bits, err := it.br.readBits(64)
 | |
| 		if err != nil {
 | |
| 			it.err = err
 | |
| 			return ValNone
 | |
| 		}
 | |
| 
 | |
| 		dod = int64(bits)
 | |
| 	}
 | |
| 
 | |
| 	if sz != 0 {
 | |
| 		bits, err := it.br.readBitsFast(sz)
 | |
| 		if err != nil {
 | |
| 			bits, err = it.br.readBits(sz)
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			it.err = err
 | |
| 			return ValNone
 | |
| 		}
 | |
| 
 | |
| 		// Account for negative numbers, which come back as high unsigned numbers.
 | |
| 		// See docs/bstream.md.
 | |
| 		if bits > (1 << (sz - 1)) {
 | |
| 			bits -= 1 << sz
 | |
| 		}
 | |
| 		dod = int64(bits)
 | |
| 	}
 | |
| 
 | |
| 	it.tDelta = uint64(int64(it.tDelta) + dod)
 | |
| 	it.t = it.t + int64(it.tDelta)
 | |
| 
 | |
| 	return it.readValue()
 | |
| }
 | |
| 
 | |
| func (it *xorIterator) readValue() ValueType {
 | |
| 	val, leading, trailing, err := xorRead(&it.br, it.val, it.leading, it.trailing)
 | |
| 	if err != nil {
 | |
| 		it.err = err
 | |
| 		return ValNone
 | |
| 	}
 | |
| 	it.val, it.leading, it.trailing = val, leading, trailing
 | |
| 	it.numRead++
 | |
| 	return ValFloat
 | |
| }
 | |
| 
 | |
| func xorWrite(
 | |
| 	b *bstream,
 | |
| 	newValue, currentValue float64,
 | |
| 	currentLeading, currentTrailing uint8,
 | |
| ) (newLeading, newTrailing uint8) {
 | |
| 	delta := math.Float64bits(newValue) ^ math.Float64bits(currentValue)
 | |
| 
 | |
| 	if delta == 0 {
 | |
| 		b.writeBit(zero)
 | |
| 		return currentLeading, currentTrailing
 | |
| 	}
 | |
| 	b.writeBit(one)
 | |
| 
 | |
| 	newLeading = uint8(bits.LeadingZeros64(delta))
 | |
| 	newTrailing = uint8(bits.TrailingZeros64(delta))
 | |
| 
 | |
| 	// Clamp number of leading zeros to avoid overflow when encoding.
 | |
| 	if newLeading >= 32 {
 | |
| 		newLeading = 31
 | |
| 	}
 | |
| 
 | |
| 	if currentLeading != 0xff && newLeading >= currentLeading && newTrailing >= currentTrailing {
 | |
| 		// In this case, we stick with the current leading/trailing.
 | |
| 		b.writeBit(zero)
 | |
| 		b.writeBits(delta>>currentTrailing, 64-int(currentLeading)-int(currentTrailing))
 | |
| 		return currentLeading, currentTrailing
 | |
| 	}
 | |
| 
 | |
| 	b.writeBit(one)
 | |
| 	b.writeBits(uint64(newLeading), 5)
 | |
| 
 | |
| 	// Note that if newLeading == newTrailing == 0, then sigbits == 64. But
 | |
| 	// that value doesn't actually fit into the 6 bits we have.  Luckily, we
 | |
| 	// never need to encode 0 significant bits, since that would put us in
 | |
| 	// the other case (vdelta == 0).  So instead we write out a 0 and adjust
 | |
| 	// it back to 64 on unpacking.
 | |
| 	sigbits := 64 - newLeading - newTrailing
 | |
| 	b.writeBits(uint64(sigbits), 6)
 | |
| 	b.writeBits(delta>>newTrailing, int(sigbits))
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func xorRead(
 | |
| 	br *bstreamReader, currentValue float64, currentLeading, currentTrailing uint8,
 | |
| ) (newValue float64, newLeading, newTrailing uint8, err error) {
 | |
| 	var bit bit
 | |
| 	var bits uint64
 | |
| 
 | |
| 	bit, err = br.readBitFast()
 | |
| 	if err != nil {
 | |
| 		bit, err = br.readBit()
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	if bit == zero {
 | |
| 		return currentValue, currentLeading, currentTrailing, nil
 | |
| 	}
 | |
| 	bit, err = br.readBitFast()
 | |
| 	if err != nil {
 | |
| 		bit, err = br.readBit()
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	if bit == zero {
 | |
| 		// Reuse leading/trailing zero bits.
 | |
| 		newLeading, newTrailing = currentLeading, currentTrailing
 | |
| 	} else {
 | |
| 		bits, err = br.readBitsFast(5)
 | |
| 		if err != nil {
 | |
| 			bits, err = br.readBits(5)
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		newLeading = uint8(bits)
 | |
| 
 | |
| 		bits, err = br.readBitsFast(6)
 | |
| 		if err != nil {
 | |
| 			bits, err = br.readBits(6)
 | |
| 		}
 | |
| 		if err != nil {
 | |
| 			return
 | |
| 		}
 | |
| 		mbits := uint8(bits)
 | |
| 		// 0 significant bits here means we overflowed and we actually
 | |
| 		// need 64; see comment in xrWrite.
 | |
| 		if mbits == 0 {
 | |
| 			mbits = 64
 | |
| 		}
 | |
| 		newTrailing = 64 - newLeading - mbits
 | |
| 	}
 | |
| 
 | |
| 	mbits := 64 - newLeading - newTrailing
 | |
| 	bits, err = br.readBitsFast(mbits)
 | |
| 	if err != nil {
 | |
| 		bits, err = br.readBits(mbits)
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 	vbits := math.Float64bits(currentValue)
 | |
| 	vbits ^= bits << newTrailing
 | |
| 	newValue = math.Float64frombits(vbits)
 | |
| 	return
 | |
| }
 |