From 1d5f85817dcd582090fbc42e62e74d4e192b6cf6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 8 Sep 2017 08:48:19 +0200 Subject: [PATCH 1/5] Fix various races --- head.go | 35 +++++++++++++++++++---------------- wal.go | 14 ++++++++++++-- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/head.go b/head.go index a74552bcaf..4e1e858773 100644 --- a/head.go +++ b/head.go @@ -402,9 +402,11 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } + s.Lock() if err := s.appendable(t, v); err != nil { return err } + s.Unlock() if t < a.mint { return ErrOutOfBounds @@ -435,7 +437,10 @@ func (a *headAppender) Commit() error { total := len(a.samples) for _, s := range a.samples { + s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.Unlock() + if !ok { total-- } @@ -672,9 +677,9 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { s := h.head.series.getByID(sid) - s.mtx.RLock() + s.Lock() c := s.chunk(int(cid)) - s.mtx.RUnlock() + s.Unlock() // Do not expose chunks that are outside of the specified range. if c == nil || !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { @@ -694,9 +699,10 @@ type safeChunk struct { } func (c *safeChunk) Iterator() chunks.Iterator { - c.s.mtx.RLock() - defer c.s.mtx.RUnlock() - return c.s.iterator(c.cid) + c.s.Lock() + it := c.s.iterator(c.cid) + c.s.Unlock() + return it } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } @@ -803,8 +809,8 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM } *lbls = append((*lbls)[:0], s.lset...) - s.mtx.RLock() - defer s.mtx.RUnlock() + s.Lock() + defer s.Unlock() *chks = (*chks)[:0] @@ -956,11 +962,11 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { for hash, all := range s.hashes[i] { for _, series := range all { - series.mtx.Lock() + series.Lock() rmChunks += series.truncateChunksBefore(mint) if len(series.chunks) > 0 { - series.mtx.Unlock() + series.Unlock() continue } @@ -983,7 +989,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { s.locks[j].Unlock() } - series.mtx.Unlock() + series.Unlock() } } @@ -1040,8 +1046,10 @@ type sample struct { v float64 } +// memSeries is the in-memory representation of a series. None of its methods +// are goroutine safe and its the callers responsibility to lock it. type memSeries struct { - mtx sync.RWMutex + sync.Mutex ref uint64 lset labels.Labels @@ -1143,8 +1151,6 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) { func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { const samplesPerChunk = 120 - s.mtx.Lock() - c := s.head() if c == nil { @@ -1152,7 +1158,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { chunkCreated = true } if c.maxTime >= t { - s.mtx.Unlock() return false, chunkCreated } if c.chunk.NumSamples() > samplesPerChunk/4 && t >= s.nextAt { @@ -1175,8 +1180,6 @@ func (s *memSeries) append(t int64, v float64) (success, chunkCreated bool) { s.sampleBuf[2] = s.sampleBuf[3] s.sampleBuf[3] = sample{t: t, v: v} - s.mtx.Unlock() - return true, chunkCreated } diff --git a/wal.go b/wal.go index 68c48838cc..747510fd60 100644 --- a/wal.go +++ b/wal.go @@ -398,6 +398,10 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { buf := w.getBuffer() flag := w.encodeSeries(buf, series) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySeries, flag, buf.get()) w.putBuffer(buf) @@ -425,6 +429,10 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { buf := w.getBuffer() flag := w.encodeSamples(buf, samples) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntrySamples, flag, buf.get()) w.putBuffer(buf) @@ -451,6 +459,10 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { buf := w.getBuffer() flag := w.encodeDeletes(buf, stones) + + w.mtx.Lock() + defer w.mtx.Unlock() + err := w.write(WALEntryDeletes, flag, buf.get()) w.putBuffer(buf) @@ -661,8 +673,6 @@ const ( ) func (w *SegmentWAL) write(t WALEntryType, flag uint8, buf []byte) error { - w.mtx.Lock() - defer w.mtx.Unlock() // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. // TODO(gouthamve): Add a test for this case where the commit is greater than segmentSize. From 6892fc6dcb7c5e6cf44764e3f893f55236774aa1 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 8 Sep 2017 10:12:28 +0200 Subject: [PATCH 2/5] Finish old WAL segment async, default to no fsync We were still fsyncing while holding the write lock when we cut a new segment. Given we cannot do anything but logging errors, we might just as well complete segments asynchronously. There's not realistic use case where one would fsync after every WAL entry, thus make the default of a flush interval of 0 to never fsync which is a much more likely use case. --- cmd/tsdb/main.go | 3 ++- head.go | 7 ++++--- wal.go | 49 +++++++++++++++++++++++------------------------- wal_test.go | 3 +-- 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 491c2ac35c..960c9c05e3 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -139,7 +139,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { dur := measureTime("ingestScrapes", func() { b.startProfiling() - total, err = b.ingestScrapes(metrics, 2000) + total, err = b.ingestScrapes(metrics, 15000) if err != nil { exitWithError(err) } @@ -147,6 +147,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { fmt.Println(" > total samples:", total) fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) + select {} measureTime("stopStorage", func() { if err := b.storage.Close(); err != nil { diff --git a/head.go b/head.go index 4e1e858773..b2d6e9842d 100644 --- a/head.go +++ b/head.go @@ -403,11 +403,12 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { return errors.Wrap(ErrNotFound, "unknown series") } s.Lock() - if err := s.appendable(t, v); err != nil { - return err - } + err := s.appendable(t, v) s.Unlock() + if err != nil { + return err + } if t < a.mint { return ErrOutOfBounds } diff --git a/wal.go b/wal.go index 747510fd60..9af9a18536 100644 --- a/wal.go +++ b/wal.go @@ -417,10 +417,6 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error { tf.minSeries = s.Ref } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -447,10 +443,6 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error { tf.maxTime = s.T } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -479,10 +471,6 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error { } } } - - if w.flushInterval <= 0 { - return errors.Wrap(w.Sync(), "sync") - } return nil } @@ -537,19 +525,26 @@ func (w *SegmentWAL) createSegmentFile(name string) (*os.File, error) { func (w *SegmentWAL) cut() error { // Sync current head to disk and close. if hf := w.head(); hf != nil { - if err := w.sync(); err != nil { - return err - } - off, err := hf.Seek(0, os.SEEK_CUR) - if err != nil { - return err - } - if err := hf.Truncate(off); err != nil { - return err - } - if err := hf.Close(); err != nil { + if err := w.flush(); err != nil { return err } + // Finish last segment asynchronously to not block the WAL moving along + // in the new segment. + go func() { + off, err := hf.Seek(0, os.SEEK_CUR) + if err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Truncate(off); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Sync(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + if err := hf.Close(); err != nil { + w.logger.Log("msg", "finish old segment", "segment", hf.Name(), "err", err) + } + }() } p, _, err := nextSequenceFile(w.dirFile.Name()) @@ -561,9 +556,11 @@ func (w *SegmentWAL) cut() error { return err } - if err = w.dirFile.Sync(); err != nil { - return err - } + go func() { + if err = w.dirFile.Sync(); err != nil { + w.logger.Log("msg", "sync WAL directory", "err", err) + } + }() w.files = append(w.files, newSegmentFile(f)) diff --git a/wal_test.go b/wal_test.go index 45279b1b5f..180623dd0f 100644 --- a/wal_test.go +++ b/wal_test.go @@ -91,9 +91,8 @@ func TestSegmentWAL_cut(t *testing.T) { require.NoError(t, w.cut(), "cut failed") - // Cutting creates a new file and close the previous tail file. + // Cutting creates a new file. require.Equal(t, 2, len(w.files)) - require.Error(t, w.files[0].Close()) require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!"))) From f904cd385f4a6595f7b31f9c75e77eccff5c46d6 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 8 Sep 2017 15:09:24 +0200 Subject: [PATCH 3/5] Do not build a superflous 'all' postings --- compact.go | 13 ------------- db.go | 6 ++++++ head.go | 3 --- 3 files changed, 6 insertions(+), 16 deletions(-) diff --git a/compact.go b/compact.go index 7d8174f0d3..a3bc7d17a3 100644 --- a/compact.go +++ b/compact.go @@ -17,7 +17,6 @@ import ( "math/rand" "os" "path/filepath" - "runtime" "sort" "time" @@ -365,10 +364,6 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) - - // We might have done quite a few allocs. Enforce a GC so they do not accumulate - // with subsequent compactions or head GCs. - runtime.GC() }(time.Now()) dir := filepath.Join(dest, meta.ULID.String()) @@ -570,14 +565,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, return errors.Wrap(err, "write postings") } } - // Write a postings list containing all series. - all := make([]uint64, i) - for i := range all { - all[i] = uint64(i) - } - if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { - return errors.Wrap(err, "write 'all' postings") - } return nil } diff --git a/db.go b/db.go index ad034d8b02..c9745cfc6f 100644 --- a/db.go +++ b/db.go @@ -21,6 +21,7 @@ import ( "io/ioutil" "os" "path/filepath" + "runtime" "sort" "strconv" "sync" @@ -349,9 +350,12 @@ func (db *DB) compact() (changes bool, err error) { } changes = true + runtime.GC() + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } // Check for compactions of multiple blocks. @@ -380,10 +384,12 @@ func (db *DB) compact() (changes bool, err error) { return changes, errors.Wrap(err, "delete compacted block") } } + runtime.GC() if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } + runtime.GC() } return changes, nil diff --git a/head.go b/head.go index b2d6e9842d..ea7b63f8a6 100644 --- a/head.go +++ b/head.go @@ -15,7 +15,6 @@ package tsdb import ( "math" - "runtime" "sort" "sync" "sync/atomic" @@ -515,8 +514,6 @@ Outer: // gc removes data before the minimum timestmap from the head. func (h *Head) gc() { - defer runtime.GC() - // Only data strictly lower than this timestamp must be deleted. mint := h.MinTime() From b09d90c79c4821063f9151daf82c4266f73f9c9e Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 8 Sep 2017 15:15:12 +0200 Subject: [PATCH 4/5] Add decoding method to retrieve unsafe strings When decoding data from mmaped blocks, we would like to retrieve a string backed by the mmaped region. As the underlying byte slice never changes, this is safe. --- encoding_helpers.go | 16 ++++++++++++++++ index.go | 10 +++------- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/encoding_helpers.go b/encoding_helpers.go index 9aa4ba4097..17c3ff0811 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -77,6 +77,22 @@ func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } func (d *decbuf) be32int() int { return int(d.be32()) } func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +// uvarintTempStr decodes like uvarintStr but the returned string is +// not safe to use if the underyling buffer changes. +func (d *decbuf) uvarintTempStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := yoloString(d.b[:l]) + d.b = d.b[l:] + return s +} + func (d *decbuf) uvarintStr() string { l := d.uvarint64() if d.e != nil { diff --git a/index.go b/index.go index ddc2c4f52a..e65f3fec15 100644 --- a/index.go +++ b/index.go @@ -335,10 +335,6 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { for _, s := range symbols { w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) - - // NOTE: len(s) gives the number of runes, not the number of bytes. - // Therefore the read-back length for strings with unicode characters will - // be off when not using putUvarintStr. w.buf2.putUvarintStr(s) } @@ -636,7 +632,7 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { keys := make([]string, 0, keyCount) for i := 0; i < keyCount; i++ { - keys = append(keys, d2.uvarintStr()) + keys = append(keys, d2.uvarintTempStr()) } res[strings.Join(keys, sep)] = uint32(d2.uvarint()) @@ -673,7 +669,7 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) { d := r.decbufAt(int(o)) - s := d.uvarintStr() + s := d.uvarintTempStr() if d.err() != nil { return "", errors.Wrapf(d.err(), "read symbol at %d", o) } @@ -688,7 +684,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) { sym := make(map[string]struct{}, count) for ; count > 0; count-- { - s := d2.uvarintStr() + s := d2.uvarintTempStr() sym[s] = struct{}{} } From 24362567b91a099c5a1f5ce822f83f55516bf6de Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Mon, 11 Sep 2017 10:33:17 +0200 Subject: [PATCH 5/5] Fix test flakes --- wal_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/wal_test.go b/wal_test.go index 180623dd0f..a3177ee6ac 100644 --- a/wal_test.go +++ b/wal_test.go @@ -382,7 +382,7 @@ func TestWALRestoreCorrupted(t *testing.T) { t.Run(c.name, func(t *testing.T) { // Generate testing data. It does not make semantical sense but // for the purpose of this test. - dir, err := ioutil.TempDir("", "test_corrupted_checksum") + dir, err := ioutil.TempDir("", "test_corrupted") require.NoError(t, err) defer os.RemoveAll(dir) @@ -399,6 +399,7 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, w.Close()) + w.files[0].Sync() // Corrupt the second entry in the first file. // After re-opening we must be able to read the first entry // and the rest, including the second file, must be truncated for clean further