diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index e24b720d2a..91024d2826 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -852,8 +852,10 @@ func BenchmarkCompaction(b *testing.B) { b.ResetTimer() b.ReportAllocs() - _, err = c.Compact(dir, blockDirs, blocks) - testutil.Ok(b, err) + for i := 0; i < b.N; i++ { + _, err = c.Compact(dir, blockDirs, blocks) + testutil.Ok(b, err) + } }) } } diff --git a/tsdb/encoding/encoding.go b/tsdb/encoding/encoding.go index a732a6048e..3e2e2804a0 100644 --- a/tsdb/encoding/encoding.go +++ b/tsdb/encoding/encoding.go @@ -75,10 +75,20 @@ func (e *Encbuf) PutUvarintStr(s string) { // PutHash appends a hash over the buffers current contents to the buffer. func (e *Encbuf) PutHash(h hash.Hash) { h.Reset() + e.WriteToHash(h) + e.PutHashSum(h) +} + +// WriteToHash writes the current buffer contents to the given hash. +func (e *Encbuf) WriteToHash(h hash.Hash) { _, err := h.Write(e.B) if err != nil { panic(err) // The CRC32 implementation does not error } +} + +// PutHashSum writes the Sum of the given hash to the buffer. +func (e *Encbuf) PutHashSum(h hash.Hash) { e.B = h.Sum(e.B) } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 1e601ccf32..592cb1f697 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -236,6 +236,14 @@ func (w *Writer) write(bufs ...[]byte) error { return nil } +func (w *Writer) writeAt(buf []byte, pos uint64) error { + if err := w.fbuf.Flush(); err != nil { + return err + } + _, err := w.f.WriteAt(buf, int64(pos)) + return err +} + // addPadding adds zero byte padding until the file size is a multiple size. func (w *Writer) addPadding(size int) error { p := w.pos % uint64(size) @@ -382,23 +390,42 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } sort.Strings(symbols) - w.buf1.Reset() - w.buf2.Reset() + startPos := w.pos + // Leave 4 bytes of space for the length, which will be calculated later. + if err := w.write([]byte("alen")); err != nil { + return err + } + w.crc32.Reset() - w.buf2.PutBE32int(len(symbols)) + w.buf1.Reset() + w.buf1.PutBE32int(len(symbols)) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } w.symbols = make(map[string]uint32, len(symbols)) for index, s := range symbols { w.symbols[s] = uint32(index) - w.buf2.PutUvarintStr(s) + w.buf1.Reset() + w.buf1.PutUvarintStr(s) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } } - w.buf1.PutBE32int(w.buf2.Len()) - w.buf2.PutHash(w.crc32) + // Write out the length. + w.buf1.Reset() + w.buf1.PutBE32int(int(w.pos - startPos - 4)) + if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + return err + } - err := w.write(w.buf1.Get(), w.buf2.Get()) - return errors.Wrap(err, "write symbols") + w.buf1.Reset() + w.buf1.PutHashSum(w.crc32) + return w.write(w.buf1.Get()) } func (w *Writer) WriteLabelIndex(names []string, values []string) error { @@ -425,9 +452,20 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { offset: w.pos, }) - w.buf2.Reset() - w.buf2.PutBE32int(len(names)) - w.buf2.PutBE32int(valt.Len()) + startPos := w.pos + // Leave 4 bytes of space for the length, which will be calculated later. + if err := w.write([]byte("alen")); err != nil { + return err + } + w.crc32.Reset() + + w.buf1.Reset() + w.buf1.PutBE32int(len(names)) + w.buf1.PutBE32int(valt.Len()) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } // here we have an index for the symbol file if v2, otherwise it's an offset for _, v := range valt.entries { @@ -435,55 +473,104 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { if !ok { return errors.Errorf("symbol entry for %q does not exist", v) } - w.buf2.PutBE32(index) + w.buf1.Reset() + w.buf1.PutBE32(index) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } + } + + // Write out the length. + w.buf1.Reset() + w.buf1.PutBE32int(int(w.pos - startPos - 4)) + if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + return err } w.buf1.Reset() - w.buf1.PutBE32int(w.buf2.Len()) - - w.buf2.PutHash(w.crc32) - - err = w.write(w.buf1.Get(), w.buf2.Get()) - return errors.Wrap(err, "write label index") + w.buf1.PutHashSum(w.crc32) + return w.write(w.buf1.Get()) } // writeLabelIndexesOffsetTable writes the label indices offset table. func (w *Writer) writeLabelIndexesOffsetTable() error { - w.buf2.Reset() - w.buf2.PutBE32int(len(w.labelIndexes)) + startPos := w.pos + // Leave 4 bytes of space for the length, which will be calculated later. + if err := w.write([]byte("alen")); err != nil { + return err + } + w.crc32.Reset() + + w.buf1.Reset() + w.buf1.PutBE32int(len(w.labelIndexes)) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } for _, e := range w.labelIndexes { - w.buf2.PutUvarint(len(e.keys)) + w.buf1.Reset() + w.buf1.PutUvarint(len(e.keys)) for _, k := range e.keys { - w.buf2.PutUvarintStr(k) + w.buf1.PutUvarintStr(k) } - w.buf2.PutUvarint64(e.offset) + w.buf1.PutUvarint64(e.offset) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } + } + // Write out the length. + w.buf1.Reset() + w.buf1.PutBE32int(int(w.pos - startPos - 4)) + if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + return err } w.buf1.Reset() - w.buf1.PutBE32int(w.buf2.Len()) - w.buf2.PutHash(w.crc32) - - return w.write(w.buf1.Get(), w.buf2.Get()) + w.buf1.PutHashSum(w.crc32) + return w.write(w.buf1.Get()) } // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { - w.buf2.Reset() - w.buf2.PutBE32int(len(w.postings)) + startPos := w.pos + // Leave 4 bytes of space for the length, which will be calculated later. + if err := w.write([]byte("alen")); err != nil { + return err + } + w.crc32.Reset() + + w.buf1.Reset() + w.buf1.PutBE32int(len(w.postings)) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } for _, e := range w.postings { - w.buf2.PutUvarint(2) - w.buf2.PutUvarintStr(e.name) - w.buf2.PutUvarintStr(e.value) - w.buf2.PutUvarint64(e.offset) + w.buf1.Reset() + w.buf1.PutUvarint(2) + w.buf1.PutUvarintStr(e.name) + w.buf1.PutUvarintStr(e.value) + w.buf1.PutUvarint64(e.offset) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } + } + + // Write out the length. + w.buf1.Reset() + w.buf1.PutBE32int(int(w.pos - startPos - 4)) + if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + return err } w.buf1.Reset() - w.buf1.PutBE32int(w.buf2.Len()) - w.buf2.PutHash(w.crc32) - - return w.write(w.buf1.Get(), w.buf2.Get()) + w.buf1.PutHashSum(w.crc32) + return w.write(w.buf1.Get()) } const indexTOCLen = 6*8 + 4 @@ -539,21 +626,40 @@ func (w *Writer) WritePostings(name, value string, it Postings) error { } sort.Sort(uint32slice(refs)) - w.buf2.Reset() - w.buf2.PutBE32int(len(refs)) + startPos := w.pos + // Leave 4 bytes of space for the length, which will be calculated later. + if err := w.write([]byte("alen")); err != nil { + return err + } + w.crc32.Reset() + + w.buf1.Reset() + w.buf1.PutBE32int(len(refs)) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } for _, r := range refs { - w.buf2.PutBE32(r) + w.buf1.Reset() + w.buf1.PutBE32(r) + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } } w.uint32s = refs + // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(w.buf2.Len()) + w.buf1.PutBE32int(int(w.pos - startPos - 4)) + if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + return err + } - w.buf2.PutHash(w.crc32) - - err := w.write(w.buf1.Get(), w.buf2.Get()) - return errors.Wrap(err, "write postings") + w.buf1.Reset() + w.buf1.PutHashSum(w.crc32) + return w.write(w.buf1.Get()) } type uint32slice []uint32