diff --git a/block.go b/block.go index 400cdac786..2dab0c32d5 100644 --- a/block.go +++ b/block.go @@ -151,6 +151,9 @@ type BlockMeta struct { // Information on compactions the block was created from. Compaction BlockMetaCompaction `json:"compaction"` + + // Version of the index format. + Version int `json:"version"` } // BlockStats contains stats about contents of a block. @@ -176,12 +179,6 @@ const ( flagStd = 1 ) -type blockMeta struct { - Version int `json:"version"` - - *BlockMeta -} - const indexFilename = "index" const metaFilename = "meta.json" @@ -193,16 +190,16 @@ func readMetaFile(dir string) (*BlockMeta, error) { if err != nil { return nil, err } - var m blockMeta + var m BlockMeta if err := json.Unmarshal(b, &m); err != nil { return nil, err } - if m.Version != 1 { + if m.Version != 1 && m.Version != 2 { return nil, errors.Errorf("unexpected meta file version %d", m.Version) } - return m.BlockMeta, nil + return &m, nil } func writeMetaFile(dir string, meta *BlockMeta) error { @@ -219,7 +216,8 @@ func writeMetaFile(dir string, meta *BlockMeta) error { enc.SetIndent("", "\t") var merr MultiError - if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil { + + if merr.Add(enc.Encode(meta)); merr.Err() != nil { merr.Add(f.Close()) return merr.Err() } @@ -255,7 +253,7 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { if err != nil { return nil, err } - ir, err := index.NewFileReader(filepath.Join(dir, "index")) + ir, err := index.NewFileReader(filepath.Join(dir, "index"), meta.Version) if err != nil { return nil, err } diff --git a/block_test.go b/block_test.go index 690cd07ad1..4ebb9685d0 100644 --- a/block_test.go +++ b/block_test.go @@ -42,7 +42,7 @@ func TestSetCompactionFailed(t *testing.T) { func createEmptyBlock(t *testing.T, dir string) *Block { testutil.Ok(t, os.MkdirAll(dir, 0777)) - testutil.Ok(t, writeMetaFile(dir, &BlockMeta{})) + testutil.Ok(t, writeMetaFile(dir, &BlockMeta{Version: 2})) ir, err := index.NewWriter(filepath.Join(dir, indexFilename)) testutil.Ok(t, err) diff --git a/compact.go b/compact.go index 5c532759f9..f96e643945 100644 --- a/compact.go +++ b/compact.go @@ -428,6 +428,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) + meta.Version = indexw.Version if err != nil { return errors.Wrap(err, "open index writer") } diff --git a/Documentation/format/chunks.md b/docs/format/chunks.md similarity index 100% rename from Documentation/format/chunks.md rename to docs/format/chunks.md diff --git a/Documentation/format/index.md b/docs/format/index.md similarity index 98% rename from Documentation/format/index.md rename to docs/format/index.md index ce37999078..48366ef8dd 100644 --- a/Documentation/format/index.md +++ b/docs/format/index.md @@ -65,7 +65,7 @@ Strings are referenced by pointing to the beginning of their length field. The s ### Series The section contains a sequence of series that hold the label set of the series as well as its chunks within the block. The series are sorted lexicographically by their label sets. -The file offset to the beginning of a series serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets. +Each series section is aligned to 16 bytes. The ID for a series is the `offset/16`. This serves as the series' ID in all subsequent references. Thereby, a sorted list of series IDs implies a lexicographically sorted list of series label sets. ``` ┌───────────────────────────────────────┐ diff --git a/Documentation/format/tombstones.md b/docs/format/tombstones.md similarity index 100% rename from Documentation/format/tombstones.md rename to docs/format/tombstones.md diff --git a/index/index.go b/index/index.go index ad6dbb218a..df8d7b5fea 100644 --- a/index/index.go +++ b/index/index.go @@ -98,7 +98,7 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } -// indexWriter implements the IndexWriter interface for the standard +// Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { f *os.File @@ -122,6 +122,8 @@ type Writer struct { lastSeries labels.Labels crc32 hash.Hash + + Version int } type indexTOC struct { @@ -166,6 +168,8 @@ func NewWriter(fn string) (*Writer, error) { symbols: make(map[string]uint32, 1<<13), seriesOffsets: make(map[uint64]uint64, 1<<16), crc32: newCRC32(), + + Version: 2, } if err := iw.writeMeta(); err != nil { return nil, err @@ -180,12 +184,12 @@ func (w *Writer) write(bufs ...[]byte) error { if err != nil { return err } - // For now the index file must not grow beyond 4GiB. Some of the fixed-sized + // For now the index file must not grow beyond 64GiB. Some of the fixed-sized // offset references in v1 are only 4 bytes large. // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. - if w.pos > math.MaxUint32 { - return errors.Errorf("exceeding max size of 4GiB") + if w.pos > 16*math.MaxUint32 { + return errors.Errorf("exceeding max size of 64GiB") } } return nil @@ -250,6 +254,7 @@ func (w *Writer) writeMeta() error { return w.write(w.buf1.get()) } +// AddSeries adds the series one at a time along with its chunks. func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { if err := w.ensureStage(idxStageSeries); err != nil { return err @@ -261,7 +266,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta if _, ok := w.seriesOffsets[ref]; ok { return errors.Errorf("series with reference %d already added", ref) } - w.seriesOffsets[ref] = w.pos + w.addPadding(16) + w.seriesOffsets[ref] = w.pos / 16 w.buf2.reset() w.buf2.putUvarint(len(lset)) @@ -531,9 +537,11 @@ type Reader struct { // the block has been unmapped. symbols map[uint32]string - dec *DecoderV1 + dec *Decoder crc32 hash.Hash32 + + version int } var ( @@ -563,20 +571,20 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { } // NewReader returns a new IndexReader on the given byte slice. -func NewReader(b ByteSlice) (*Reader, error) { - return newReader(b, nil) +func NewReader(b ByteSlice, version int) (*Reader, error) { + return newReader(b, nil, version) } // NewFileReader returns a new index reader against the given index file. -func NewFileReader(path string) (*Reader, error) { +func NewFileReader(path string, version int) (*Reader, error) { f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err } - return newReader(realByteSlice(f.Bytes()), f) + return newReader(realByteSlice(f.Bytes()), f, version) } -func newReader(b ByteSlice, c io.Closer) (*Reader, error) { +func newReader(b ByteSlice, c io.Closer, version int) (*Reader, error) { r := &Reader{ b: b, c: c, @@ -584,6 +592,12 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { labels: map[string]uint32{}, postings: map[labels.Label]uint32{}, crc32: newCRC32(), + version: version, + } + + if version != 1 && version != 2 { + return nil, errors.Errorf("unexpected file version %d", version) + } // Verify magic number. if b.Len() < 4 { @@ -622,7 +636,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { return nil, errors.Wrap(err, "read postings table") } - r.dec = &DecoderV1{symbols: r.symbols} + r.dec = &Decoder{symbols: r.symbols} return r, nil } @@ -852,9 +866,13 @@ func (r *Reader) LabelIndices() ([][]string, error) { return res, nil } -// Series the series with the given ID and writes its labels and chunks into lbls and chks. +// Series reads the series with the given ID and writes its labels and chunks into lbls and chks. func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { - d := r.decbufUvarintAt(int(id)) + offset := id + if r.version == 2 { + offset = 16 * id + } + d := r.decbufUvarintAt(int(offset)) if d.err() != nil { return d.err() } @@ -955,15 +973,15 @@ func (t *serializedStringTuples) At(i int) ([]string, error) { return res, nil } -// DecoderV1 provides decoding methods for the v1 index file format. +// Decoder provides decoding methods for the v1 and v2 index file format. // // It currently does not contain decoding methods for all entry types but can be extended // by them if there's demand. -type DecoderV1 struct { +type Decoder struct { symbols map[uint32]string } -func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) { +func (dec *Decoder) lookupSymbol(o uint32) (string, error) { s, ok := dec.symbols[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) @@ -973,12 +991,12 @@ func (dec *DecoderV1) lookupSymbol(o uint32) (string, error) { // SetSymbolTable set the symbol table to be used for lookups when decoding series // and label indices -func (dec *DecoderV1) SetSymbolTable(t map[uint32]string) { +func (dec *Decoder) SetSymbolTable(t map[uint32]string) { dec.symbols = t } // Postings returns a postings list for b and its number of elements. -func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) { +func (dec *Decoder) Postings(b []byte) (int, Postings, error) { d := decbuf{b: b} n := d.be32int() l := d.get() @@ -986,7 +1004,7 @@ func (dec *DecoderV1) Postings(b []byte) (int, Postings, error) { } // Series decodes a series entry from the given byte slice into lset and chks. -func (dec *DecoderV1) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { +func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error { *lbls = (*lbls)[:0] *chks = (*chks)[:0] diff --git a/index/index_test.go b/index/index_test.go index 83b6ef6574..65ef9b6e67 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -160,7 +160,7 @@ func TestIndexRW_Create_Open(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, iw.Close()) - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, 1) testutil.Ok(t, err) testutil.Ok(t, ir.Close()) @@ -170,7 +170,7 @@ func TestIndexRW_Create_Open(t *testing.T) { _, err = f.WriteAt([]byte{0, 0}, 0) testutil.Ok(t, err) - _, err = NewFileReader(dir) + _, err = NewFileReader(dir, 1) testutil.NotOk(t, err) } @@ -213,7 +213,7 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.Close()) - ir, err := NewFileReader(fn) + ir, err := NewFileReader(fn, 2) testutil.Ok(t, err) p, err := ir.Postings("a", "1") @@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() testutil.Ok(t, err) - ir, err := NewFileReader(filepath.Join(dir, "index")) + ir, err := NewFileReader(filepath.Join(dir, "index"), 2) testutil.Ok(t, err) for p := range mi.postings {