diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 491c2ac35c..960c9c05e3 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -139,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 2000) + total, err = b.ingestScrapes(metrics, 15000) if err != nil { exitWithError(err) } @@ -147,6 +147,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { fmt.Println(" > total samples:", total) fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) + select {} measureTime("stopStorage", func() { if err := b.storage.Close(); err != nil { diff --git a/compact.go b/compact.go index 7d8174f0d3..a3bc7d17a3 100644 --- a/compact.go +++ b/compact.go @@ -17,7 +17,6 @@ import ( "math/rand" "os" "path/filepath" - "runtime" "sort" "time" @@ -365,10 +364,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) - - // We might have done quite a few allocs. Enforce a GC so they do not accumulate - // with subsequent compactions or head GCs. - runtime.GC() }(time.Now()) dir := filepath.Join(dest, meta.ULID.String()) @@ -570,14 +565,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write postings") } } - // Write a postings list containing all series. - all := make([]uint64, i) - for i := range all { - all[i] = uint64(i) - } - if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return errors.Wrap(err, "write 'all' postings") - } return nil } diff --git a/db.go b/db.go index ad034d8b02..c9745cfc6f 100644 --- a/db.go +++ b/db.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "sort" "strconv" "sync" @@ -349,9 +350,12 @@ func (db *DB) compact() (changes bool, err error) { } changes = true + runtime.GC() + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } // Check for compactions of multiple blocks. @@ -380,10 +384,12 @@ func (db *DB) compact() (changes bool, err error) { return changes, errors.Wrap(err, "delete compacted block") } } + runtime.GC() if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } return changes, nil diff --git a/encoding_helpers.go b/encoding_helpers.go index 9aa4ba4097..17c3ff0811 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -77,6 +77,22 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +// uvarintTempStr decodes like uvarintStr but the returned string is +// not safe to use if the underyling buffer changes. +func (d *decbuf) uvarintTempStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := yoloString(d.b[:l]) + d.b = d.b[l:] + return s +} + func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/head.go b/head.go index a74552bcaf..ea7b63f8a6 100644 --- a/head.go +++ b/head.go @@ -15,7 +15,6 @@ package tsdb import ( "math" - "runtime" "sort" "sync" "sync/atomic" @@ -402,10 +401,13 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } - if err := s.appendable(t, v); err != nil { + s.Lock() + err := s.appendable(t, v) + s.Unlock() + + if err != nil { return err } - if t < a.mint { return ErrOutOfBounds } @@ -435,7 +437,10 @@ func (a *headAppender) Commit() error { total := len(a.samples) for _, s := range a.samples { + s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.Unlock() + if !ok { total-- } @@ -509,8 +514,6 @@ Outer: // gc removes data before the minimum timestmap from the head. func (h *Head) gc() { - defer runtime.GC() - // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() @@ -672,9 +675,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s := h.head.series.getByID(sid) - s.mtx.RLock() + s.Lock() c := s.chunk(int(cid)) - s.mtx.RUnlock() + s.Unlock() // Do not expose chunks that are outside of the specified range. if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { @@ -694,9 +697,10 @@ type safeChunk struct { } func (c *safeChunk) Iterator() chunks.Iterator { - c.s.mtx.RLock() - defer c.s.mtx.RUnlock() - return c.s.iterator(c.cid) + c.s.Lock() + it := c.s.iterator(c.cid) + c.s.Unlock() + return it } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } @@ -803,8 +807,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM } *lbls = append((*lbls)[:0], s.lset...) - s.mtx.RLock() - defer s.mtx.RUnlock() + s.Lock() + defer s.Unlock() *chks = (*chks)[:0] @@ -956,11 +960,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { for hash, all := range s.hashes[i] { for _, series := range all { - series.mtx.Lock() + series.Lock() rmChunks += series.truncateChunksBefore(mint) if len(series.chunks) > 0 { - series.mtx.Unlock() + series.Unlock() continue } @@ -983,7 +987,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { s.locks[j].Unlock() } - series.mtx.Unlock() + series.Unlock() } } @@ -1040,8 +1044,10 @@ type sample struct { v float64 } +// memSeries is the in-memory representation of a series. None of its methods +// are goroutine safe and its the callers responsibility to lock it. type memSeries struct { - mtx sync.RWMutex + sync.Mutex ref uint64 lset labels.Labels @@ -1143,8 +1149,6 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { const samplesPerChunk = 120 - s.mtx.Lock() - c := s.head() if c == nil { @@ -1152,7 +1156,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { chunkCreated = true } if c.maxTime >= t { - s.mtx.Unlock() return false, chunkCreated } if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { @@ -1175,8 +1178,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - s.mtx.Unlock() - return true, chunkCreated } diff --git a/index.go b/index.go index 06f347aaa1..fd9b251623 100644 --- a/index.go +++ b/index.go @@ -347,10 +347,6 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { for _, s := range symbols { w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) - - // NOTE: len(s) gives the number of runes, not the number of bytes. - // Therefore the read-back length for strings with unicode characters will - // be off when not using putUvarintStr. w.buf2.putUvarintStr(s) } @@ -648,7 +644,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d2.uvarintTempStr()) } res[strings.Join(keys, sep)] = uint32(d2.uvarint()) @@ -685,7 +681,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) { d := r.decbufAt(int(o)) - s := d.uvarintStr() + s := d.uvarintTempStr() if d.err() != nil { return "", errors.Wrapf(d.err(), "read symbol at %d", o) } @@ -700,7 +696,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) { sym := make(map[string]struct{}, count) for ; count > 0; count-- { - s := d2.uvarintStr() + s := d2.uvarintTempStr() sym[s] = struct{}{} } diff --git a/wal.go b/wal.go index 68c48838cc..9af9a18536 100644 --- a/wal.go +++ b/wal.go @@ -398,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { buf := w.getBuffer() flag := w.encodeSeries(buf, series) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySeries, flag, buf.get()) w.putBuffer(buf) @@ -413,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { tf.minSeries = s.Ref } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -425,6 +425,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { buf := w.getBuffer() flag := w.encodeSamples(buf, samples) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySamples, flag, buf.get()) w.putBuffer(buf) @@ -439,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { tf.maxTime = s.T } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -451,6 +451,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { buf := w.getBuffer() flag := w.encodeDeletes(buf, stones) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntryDeletes, flag, buf.get()) w.putBuffer(buf) @@ -467,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { } } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -525,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { func (w *SegmentWAL) cut() error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.sync(); err != nil { - return err - } - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := hf.Truncate(off); err != nil { - return err - } - if err := hf.Close(); err != nil { + if err := w.flush(); err != nil { return err } + // Finish last segment asynchronously to not block the WAL moving along + // in the new segment. + go func() { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Truncate(off); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Sync(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Close(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + }() } p, _, err := nextSequenceFile(w.dirFile.Name()) @@ -549,9 +556,11 @@ func (w *SegmentWAL) cut() error { return err } - if err = w.dirFile.Sync(); err != nil { - return err - } + go func() { + if err = w.dirFile.Sync(); err != nil { + w.logger.Log("msg", "sync WAL directory", "err", err) + } + }() w.files = append(w.files, newSegmentFile(f)) @@ -661,8 +670,6 @@ const ( ) func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { - w.mtx.Lock() - defer w.mtx.Unlock() // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. diff --git a/wal_test.go b/wal_test.go index 45279b1b5f..f36b22e90e 100644 --- a/wal_test.go +++ b/wal_test.go @@ -91,9 +91,8 @@ func TestSegmentWAL_cut(t *testing.T) { require.NoError(t, w.cut(), "cut failed") - // Cutting creates a new file and close the previous tail file. + // Cutting creates a new file. require.Equal(t, 2, len(w.files)) - require.Error(t, w.files[0].Close()) require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) @@ -383,7 +382,7 @@ func TestWALRestoreCorrupted(t *testing.T) { t.Run(c.name, func(t *testing.T) { // Generate testing data. It does not make semantical sense but // for the purpose of this test. - dir, err := ioutil.TempDir("", "test_corrupted_checksum") + dir, err := ioutil.TempDir("", "test_corrupted") require.NoError(t, err) defer os.RemoveAll(dir) @@ -400,6 +399,10 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, w.Close()) + // cut() truncates and fsyncs the first segment async. If it happens after + // the corruption we apply below, the corruption will be overwritten again. + // Fire and forget a sync to avoid flakyness. + w.files[0].Sync() // Corrupt the second entry in the first file. // After re-opening we must be able to read the first entry // and the rest, including the second file, must be truncated for clean further