From d5d7a097e139465fc4c3c24fcfc87a6ead9f39e5 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Fri, 4 Jan 2019 15:45:50 +0100 Subject: [PATCH 01/24] Update Makefile.common This change also uses the latest staticcheck version which comes with new verifications, hence some clean up in the code. Signed-off-by: Simon Pasquier --- Makefile.common | 56 +++++++++++++++++++++++---------------- chunkenc/bstream.go | 2 ++ chunkenc/chunk_test.go | 2 +- chunks/chunks.go | 4 +-- cmd/tsdb/main.go | 8 +----- compact.go | 1 - db_test.go | 4 --- encoding_helpers.go | 2 ++ fileutil/fileutil.go | 3 +-- head.go | 5 ---- head_test.go | 1 - index/encoding_helpers.go | 2 ++ index/index.go | 4 --- index/postings.go | 5 ++-- index/postings_test.go | 16 ----------- querier_test.go | 21 --------------- staticcheck.conf | 2 -- tombstones_test.go | 1 - 18 files changed, 45 insertions(+), 94 deletions(-) delete mode 100644 staticcheck.conf diff --git a/Makefile.common b/Makefile.common index 741579e60f..fff85f9226 100644 --- a/Makefile.common +++ b/Makefile.common @@ -29,6 +29,8 @@ GO ?= go GOFMT ?= $(GO)fmt FIRST_GOPATH := $(firstword $(subst :, ,$(shell $(GO) env GOPATH))) GOOPTS ?= +GOHOSTOS ?= $(shell $(GO) env GOHOSTOS) +GOHOSTARCH ?= $(shell $(GO) env GOHOSTARCH) GO_VERSION ?= $(shell $(GO) version) GO_VERSION_NUMBER ?= $(word 3, $(GO_VERSION)) @@ -62,17 +64,30 @@ PROMU := $(FIRST_GOPATH)/bin/promu STATICCHECK := $(FIRST_GOPATH)/bin/staticcheck pkgs = ./... -GO_VERSION ?= $(shell $(GO) version) -GO_BUILD_PLATFORM ?= $(subst /,-,$(lastword $(GO_VERSION))) +ifeq (arm, $(GOHOSTARCH)) + GOHOSTARM ?= $(shell GOARM= $(GO) env GOARM) + GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)v$(GOHOSTARM) +else + GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH) +endif PROMU_VERSION ?= 0.2.0 PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz +STATICCHECK_VERSION ?= 2019.1 +STATICCHECK_URL := https://github.com/dominikh/go-tools/releases/download/$(STATICCHECK_VERSION)/staticcheck_$(GOHOSTOS)_$(GOHOSTARCH) PREFIX ?= $(shell pwd) BIN_DIR ?= $(shell pwd) DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD)) DOCKER_REPO ?= prom +ifeq ($(GOHOSTARCH),amd64) + ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux freebsd darwin windows)) + # Only supported on amd64 + test-flags := -race + endif +endif + .PHONY: all all: precheck style staticcheck unused build test @@ -110,12 +125,12 @@ common-test-short: .PHONY: common-test common-test: @echo ">> running all tests" - GO111MODULE=$(GO111MODULE) $(GO) test -race $(GOOPTS) $(pkgs) + GO111MODULE=$(GO111MODULE) $(GO) test $(test-flags) $(GOOPTS) $(pkgs) .PHONY: common-format common-format: @echo ">> formatting code" - GO111MODULE=$(GO111MODULE) $(GO) fmt $(GOOPTS) $(pkgs) + GO111MODULE=$(GO111MODULE) $(GO) fmt $(pkgs) .PHONY: common-vet common-vet: @@ -125,8 +140,12 @@ common-vet: .PHONY: common-staticcheck common-staticcheck: $(STATICCHECK) @echo ">> running staticcheck" + chmod +x $(STATICCHECK) ifdef GO111MODULE - GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" -checks "SA*" $(pkgs) +# 'go list' needs to be executed before staticcheck to prepopulate the modules cache. +# Otherwise staticcheck might fail randomly for some reason not yet explained. + GO111MODULE=$(GO111MODULE) $(GO) list -e -compiled -test=true -export=false -deps=true -find=false -tags= -- ./... > /dev/null + GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs) else $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs) endif @@ -140,8 +159,9 @@ else ifdef GO111MODULE @echo ">> running check for unused/missing packages in go.mod" GO111MODULE=$(GO111MODULE) $(GO) mod tidy +ifeq (,$(wildcard vendor)) @git diff --exit-code -- go.sum go.mod -ifneq (,$(wildcard vendor)) +else @echo ">> running check for unused packages in vendor/" GO111MODULE=$(GO111MODULE) $(GO) mod vendor @git diff --exit-code -- go.sum go.mod vendor/ @@ -175,30 +195,20 @@ common-docker-tag-latest: promu: $(PROMU) $(PROMU): - curl -s -L $(PROMU_URL) | tar -xvz -C /tmp - mkdir -v -p $(FIRST_GOPATH)/bin - cp -v /tmp/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(PROMU) + $(eval PROMU_TMP := $(shell mktemp -d)) + curl -s -L $(PROMU_URL) | tar -xvzf - -C $(PROMU_TMP) + mkdir -p $(FIRST_GOPATH)/bin + cp $(PROMU_TMP)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(FIRST_GOPATH)/bin/promu + rm -r $(PROMU_TMP) .PHONY: proto proto: @echo ">> generating code from proto files" @./scripts/genproto.sh -.PHONY: $(STATICCHECK) $(STATICCHECK): -ifdef GO111MODULE -# Get staticcheck from a temporary directory to avoid modifying the local go.{mod,sum}. -# See https://github.com/golang/go/issues/27643. -# For now, we are using the next branch of staticcheck because master isn't compatible yet with Go modules. - tmpModule=$$(mktemp -d 2>&1) && \ - mkdir -p $${tmpModule}/staticcheck && \ - cd "$${tmpModule}"/staticcheck && \ - GO111MODULE=on $(GO) mod init example.com/staticcheck && \ - GO111MODULE=on GOOS= GOARCH= $(GO) get -u honnef.co/go/tools/cmd/staticcheck@next && \ - rm -rf $${tmpModule}; -else - GOOS= GOARCH= GO111MODULE=off $(GO) get -u honnef.co/go/tools/cmd/staticcheck -endif + mkdir -p $(FIRST_GOPATH)/bin + curl -s -L $(STATICCHECK_URL) > $(STATICCHECK) ifdef GOVENDOR .PHONY: $(GOVENDOR) diff --git a/chunkenc/bstream.go b/chunkenc/bstream.go index ef04d44ba8..72368dcf48 100644 --- a/chunkenc/bstream.go +++ b/chunkenc/bstream.go @@ -39,6 +39,8 @@ // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +//lint:file-ignore U1000 Ignore all unused code. + package chunkenc import "io" diff --git a/chunkenc/chunk_test.go b/chunkenc/chunk_test.go index edabd749fe..45eb40cf51 100644 --- a/chunkenc/chunk_test.go +++ b/chunkenc/chunk_test.go @@ -32,7 +32,7 @@ func TestChunk(t *testing.T) { for enc, nc := range map[Encoding]func() Chunk{ EncXOR: func() Chunk { return NewXORChunk() }, } { - t.Run(fmt.Sprintf("%s", enc), func(t *testing.T) { + t.Run(fmt.Sprintf("%d", enc), func(t *testing.T) { for range make([]struct{}, 1) { c := nc() if err := testChunk(c); err != nil { diff --git a/chunks/chunks.go b/chunks/chunks.go index 5eab23982d..11278e0b91 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -64,9 +64,7 @@ func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool { } var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") - errInvalidChecksum = fmt.Errorf("invalid checksum") + errInvalidSize = fmt.Errorf("invalid size") ) var castagnoliTable *crc32.Table diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 01d85d590d..01a2858f1d 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -87,7 +87,7 @@ func main() { block = blocks[len(blocks)-1] } if block == nil { - exitWithError(fmt.Errorf("Block not found")) + exitWithError(fmt.Errorf("block not found")) } analyzeBlock(block, *analyzeLimit) } @@ -340,12 +340,6 @@ func measureTime(stage string, f func()) time.Duration { return time.Since(start) } -func mapToLabels(m map[string]interface{}, l *labels.Labels) { - for k, v := range m { - *l = append(*l, labels.Label{Name: k, Value: v.(string)}) - } -} - func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { scanner := bufio.NewScanner(r) diff --git a/compact.go b/compact.go index f8e6ff545c..ef31ec887b 100644 --- a/compact.go +++ b/compact.go @@ -66,7 +66,6 @@ type Compactor interface { // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { - dir string metrics *compactorMetrics logger log.Logger ranges []int64 diff --git a/db_test.go b/db_test.go index 92d487eecb..f835fc8d09 100644 --- a/db_test.go +++ b/db_test.go @@ -514,8 +514,6 @@ func TestDB_e2e(t *testing.T) { numDatapoints = 1000 numRanges = 1000 timeInterval = int64(3) - maxTime = int64(2 * 1000) - minTime = int64(200) ) // Create 8 series with 1000 data-points of different ranges and run queries. lbls := [][]labels.Label{ @@ -666,8 +664,6 @@ func TestDB_e2e(t *testing.T) { q.Close() } } - - return } func TestWALFlushedOnDBClose(t *testing.T) { diff --git a/encoding_helpers.go b/encoding_helpers.go index 6dd6e7c2e4..38ce1f2026 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//lint:file-ignore U1000 Ignore all unused code. + package tsdb import ( diff --git a/fileutil/fileutil.go b/fileutil/fileutil.go index 677df8c090..154fa18443 100644 --- a/fileutil/fileutil.go +++ b/fileutil/fileutil.go @@ -77,9 +77,8 @@ func copyFile(src, dest string) error { // returns relative paths to all files and empty directories. func readDirs(src string) ([]string, error) { var files []string - var err error - err = filepath.Walk(src, func(path string, f os.FileInfo, err error) error { + err := filepath.Walk(src, func(path string, f os.FileInfo, err error) error { relativePath := strings.TrimPrefix(path, src) if len(relativePath) > 0 { files = append(files, relativePath) diff --git a/head.go b/head.go index cbc8661f8c..8f22d2ba6b 100644 --- a/head.go +++ b/head.go @@ -1628,11 +1628,6 @@ func (ss stringset) set(s string) { ss[s] = struct{}{} } -func (ss stringset) has(s string) bool { - _, ok := ss[s] - return ok -} - func (ss stringset) String() string { return strings.Join(ss.slice(), ",") } diff --git a/head_test.go b/head_test.go index 8781f677af..1bbec5e2a5 100644 --- a/head_test.go +++ b/head_test.go @@ -584,7 +584,6 @@ func TestDelete_e2e(t *testing.T) { } } } - return } func boundedSamples(full []sample, mint, maxt int64) []sample { diff --git a/index/encoding_helpers.go b/index/encoding_helpers.go index 602498f115..9076dd3d36 100644 --- a/index/encoding_helpers.go +++ b/index/encoding_helpers.go @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//lint:file-ignore U1000 Ignore all unused code. + package index import ( diff --git a/index/index.go b/index/index.go index 6413a9fca3..0a976187de 100644 --- a/index/index.go +++ b/index/index.go @@ -36,7 +36,6 @@ const ( // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 - indexFormatV1 = 1 indexFormatV2 = 2 labelNameSeperator = "\xff" @@ -346,8 +345,6 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error { } sort.Strings(symbols) - const headerSize = 4 - w.buf1.reset() w.buf2.reset() @@ -563,7 +560,6 @@ type Reader struct { var ( errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") errInvalidChecksum = fmt.Errorf("invalid checksum") ) diff --git a/index/postings.go b/index/postings.go index 13df1c69a1..6212d07b4e 100644 --- a/index/postings.go +++ b/index/postings.go @@ -305,9 +305,8 @@ func Intersect(its ...Postings) Postings { } type intersectPostings struct { - a, b Postings - aok, bok bool - cur uint64 + a, b Postings + cur uint64 } func newIntersectPostings(a, b Postings) *intersectPostings { diff --git a/index/postings_test.go b/index/postings_test.go index 54c37f480c..668ec3609b 100644 --- a/index/postings_test.go +++ b/index/postings_test.go @@ -61,18 +61,6 @@ func TestMemPostings_ensureOrder(t *testing.T) { } } -type mockPostings struct { - next func() bool - seek func(uint64) bool - value func() uint64 - err func() error -} - -func (m *mockPostings) Next() bool { return m.next() } -func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) } -func (m *mockPostings) Value() uint64 { return m.value() } -func (m *mockPostings) Err() error { return m.err() } - func TestIntersect(t *testing.T) { var cases = []struct { a, b []uint64 @@ -300,8 +288,6 @@ func TestMergedPostingsSeek(t *testing.T) { testutil.Equals(t, c.res, lst) } } - - return } func TestRemovedPostings(t *testing.T) { @@ -463,8 +449,6 @@ func TestRemovedPostingsSeek(t *testing.T) { testutil.Equals(t, c.res, lst) } } - - return } func TestBigEndian(t *testing.T) { diff --git a/querier_test.go b/querier_test.go index 79dfbff7a0..79daa5c01e 100644 --- a/querier_test.go +++ b/querier_test.go @@ -56,18 +56,6 @@ func newMockSeriesSet(list []Series) *mockSeriesSet { } } -type mockSeriesIterator struct { - seek func(int64) bool - at func() (int64, float64) - next func() bool - err func() error -} - -func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) } -func (m *mockSeriesIterator) At() (int64, float64) { return m.at() } -func (m *mockSeriesIterator) Next() bool { return m.next() } -func (m *mockSeriesIterator) Err() error { return m.err() } - type mockSeries struct { labels func() labels.Labels iterator func() SeriesIterator @@ -451,8 +439,6 @@ Outer: testutil.Equals(t, smplExp, smplRes) } } - - return } func TestBlockQuerierDelete(t *testing.T) { @@ -615,8 +601,6 @@ Outer: testutil.Equals(t, smplExp, smplRes) } } - - return } func TestBaseChunkSeries(t *testing.T) { @@ -713,8 +697,6 @@ func TestBaseChunkSeries(t *testing.T) { testutil.Equals(t, len(tc.expIdxs), i) testutil.Ok(t, bcs.Err()) } - - return } // TODO: Remove after simpleSeries is merged @@ -1038,8 +1020,6 @@ func TestSeriesIterator(t *testing.T) { } }) }) - - return } // Regression for: https://github.com/prometheus/tsdb/pull/97 @@ -1140,7 +1120,6 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { maxt: 15, } testutil.Assert(t, p.Next() == false, "") - return } type mockChunkSeriesSet struct { diff --git a/staticcheck.conf b/staticcheck.conf deleted file mode 100644 index 3266a2e297..0000000000 --- a/staticcheck.conf +++ /dev/null @@ -1,2 +0,0 @@ -# Enable only "legacy" staticcheck verifications. -checks = [ "SA*" ] diff --git a/tombstones_test.go b/tombstones_test.go index e12574f113..5bc1a7f2d4 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -120,7 +120,6 @@ func TestAddingNewIntervals(t *testing.T) { testutil.Equals(t, c.exp, c.exist.add(c.new)) } - return } // TestMemTombstonesConcurrency to make sure they are safe to access from different goroutines. From 47166a7969f8919598a6841606a5784ccdf70296 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 7 Jan 2019 12:39:11 +0100 Subject: [PATCH 02/24] Fix chunkenc/chunk_test.go Signed-off-by: Simon Pasquier --- chunkenc/chunk_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/chunkenc/chunk_test.go b/chunkenc/chunk_test.go index 45eb40cf51..11dc19079d 100644 --- a/chunkenc/chunk_test.go +++ b/chunkenc/chunk_test.go @@ -32,7 +32,7 @@ func TestChunk(t *testing.T) { for enc, nc := range map[Encoding]func() Chunk{ EncXOR: func() Chunk { return NewXORChunk() }, } { - t.Run(fmt.Sprintf("%d", enc), func(t *testing.T) { + t.Run(fmt.Sprintf("%v", enc), func(t *testing.T) { for range make([]struct{}, 1) { c := nc() if err := testChunk(c); err != nil { From 597202ae43c987fa866989fd4f1a9915c2055e32 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 7 Jan 2019 14:45:17 +0100 Subject: [PATCH 03/24] Remove more unused code Signed-off-by: Simon Pasquier --- chunkenc/bstream.go | 12 ---------- encoding_helpers.go | 46 ++++----------------------------------- index/encoding_helpers.go | 36 ++---------------------------- 3 files changed, 6 insertions(+), 88 deletions(-) diff --git a/chunkenc/bstream.go b/chunkenc/bstream.go index 72368dcf48..0a02a73035 100644 --- a/chunkenc/bstream.go +++ b/chunkenc/bstream.go @@ -39,8 +39,6 @@ // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -//lint:file-ignore U1000 Ignore all unused code. - package chunkenc import "io" @@ -55,16 +53,6 @@ func newBReader(b []byte) bstream { return bstream{stream: b, count: 8} } -func newBWriter(size int) *bstream { - return &bstream{stream: make([]byte, 0, size), count: 0} -} - -func (b *bstream) clone() *bstream { - d := make([]byte, len(b.stream)) - copy(d, b.stream) - return &bstream{stream: d, count: b.count} -} - func (b *bstream) bytes() []byte { return b.stream } diff --git a/encoding_helpers.go b/encoding_helpers.go index 38ce1f2026..9c10e3160c 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -11,14 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -//lint:file-ignore U1000 Ignore all unused code. - package tsdb import ( "encoding/binary" - "hash" - "hash/crc32" "unsafe" "github.com/pkg/errors" @@ -34,17 +30,12 @@ type encbuf struct { func (e *encbuf) reset() { e.b = e.b[:0] } func (e *encbuf) get() []byte { return e.b } -func (e *encbuf) len() int { return len(e.b) } func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } -func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } -func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } -func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } -func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } -func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } -func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } func (e *encbuf) putBE32(x uint32) { binary.BigEndian.PutUint32(e.c[:], x) @@ -73,16 +64,6 @@ func (e *encbuf) putUvarintStr(s string) { e.putString(s) } -// putHash appends a hash over the buffers current contents to the buffer. -func (e *encbuf) putHash(h hash.Hash) { - h.Reset() - _, err := h.Write(e.b) - if err != nil { - panic(err) // The CRC32 implementation does not error - } - e.b = h.Sum(e.b) -} - // decbuf provides safe methods to extract data from a byte slice. It does all // necessary bounds checking and advancing of the byte slice. // Several datums can be extracted without checking for errors. However, before using @@ -92,15 +73,8 @@ type decbuf struct { e error } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } -func (d *decbuf) be64int64() int64 { return int64(d.be64()) } - -// crc32 returns a CRC32 checksum over the remaining bytes. -func (d *decbuf) crc32() uint32 { - return crc32.Checksum(d.b, castagnoliTable) -} +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) uvarintStr() string { l := d.uvarint64() @@ -181,18 +155,6 @@ func (d *decbuf) byte() byte { return x } -func (d *decbuf) decbuf(l int) decbuf { - if d.e != nil { - return decbuf{e: d.e} - } - if l > len(d.b) { - return decbuf{e: errInvalidSize} - } - r := decbuf{b: d.b[:l]} - d.b = d.b[l:] - return r -} - func (d *decbuf) err() error { return d.e } func (d *decbuf) len() int { return len(d.b) } func (d *decbuf) get() []byte { return d.b } diff --git a/index/encoding_helpers.go b/index/encoding_helpers.go index 9076dd3d36..9afb1a0552 100644 --- a/index/encoding_helpers.go +++ b/index/encoding_helpers.go @@ -11,8 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -//lint:file-ignore U1000 Ignore all unused code. - package index import ( @@ -33,12 +31,9 @@ func (e *encbuf) get() []byte { return e.b } func (e *encbuf) len() int { return len(e.b) } func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } -func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } -func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } -func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } @@ -88,10 +83,8 @@ type decbuf struct { e error } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } -func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } // crc32 returns a CRC32 checksum over the remaining bytes. func (d *decbuf) crc32() uint32 { @@ -164,31 +157,6 @@ func (d *decbuf) be32() uint32 { return x } -func (d *decbuf) byte() byte { - if d.e != nil { - return 0 - } - if len(d.b) < 1 { - d.e = errInvalidSize - return 0 - } - x := d.b[0] - d.b = d.b[1:] - return x -} - -func (d *decbuf) decbuf(l int) decbuf { - if d.e != nil { - return decbuf{e: d.e} - } - if l > len(d.b) { - return decbuf{e: errInvalidSize} - } - r := decbuf{b: d.b[:l]} - d.b = d.b[l:] - return r -} - func (d *decbuf) err() error { return d.e } func (d *decbuf) len() int { return len(d.b) } func (d *decbuf) get() []byte { return d.b } From 62fca18f0aef82d0b428ffd06cbd861f2cc26f18 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 7 Jan 2019 17:37:12 +0100 Subject: [PATCH 04/24] Add back indexFormatV1 Signed-off-by: Simon Pasquier --- index/index.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/index/index.go b/index/index.go index 0a976187de..0c1018dba0 100644 --- a/index/index.go +++ b/index/index.go @@ -36,6 +36,7 @@ const ( // MagicIndex 4 bytes at the head of an index file. MagicIndex = 0xBAAAD700 + indexFormatV1 = 1 indexFormatV2 = 2 labelNameSeperator = "\xff" @@ -617,7 +618,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { } r.version = int(r.b.Range(4, 5)[0]) - if r.version != 1 && r.version != 2 { + if r.version != indexFormatV1 && r.version != indexFormatV2 { return nil, errors.Errorf("unknown index file version %d", r.version) } @@ -791,14 +792,14 @@ func (r *Reader) readSymbols(off int) error { basePos = uint32(off) + 4 nextPos = basePos + uint32(origLen-d.len()) ) - if r.version == 2 { + if r.version == indexFormatV2 { r.symbolSlice = make([]string, 0, cnt) } for d.err() == nil && d.len() > 0 && cnt > 0 { s := d.uvarintStr() - if r.version == 2 { + if r.version == indexFormatV2 { r.symbolSlice = append(r.symbolSlice, s) } else { r.symbols[nextPos] = s @@ -924,7 +925,7 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err offset := id // In version 2 series IDs are no longer exact references but series are 16-byte padded // and the ID is the multiple of 16 of the actual position. - if r.version == 2 { + if r.version == indexFormatV2 { offset = id * 16 } d := r.decbufUvarintAt(int(offset)) From d7e505db34f555c4e30cd2b2b6534aa6a091e724 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 8 Jan 2019 22:38:41 +0530 Subject: [PATCH 05/24] Dont store stones in head, delete samples directly Signed-off-by: Ganesh Vernekar --- db_test.go | 54 +++++++++---- head.go | 116 ++++++++++++++++++++++------ head_test.go | 179 ++++++++++++++++++++++++++++++------------- testutil/testutil.go | 9 +++ 4 files changed, 268 insertions(+), 90 deletions(-) diff --git a/db_test.go b/db_test.go index 92d487eecb..c63eaf7809 100644 --- a/db_test.go +++ b/db_test.go @@ -209,31 +209,48 @@ func TestDBAppenderAddRef(t *testing.T) { func TestDeleteSimple(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() - defer db.Close() - - app := db.Appender() - - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) - } - - testutil.Ok(t, app.Commit()) cases := []struct { intervals Intervals remaint []int64 }{ + { + intervals: Intervals{{0, 3}}, + remaint: []int64{4, 5, 6, 7, 8, 9}, + }, + { + intervals: Intervals{{1, 3}}, + remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + }, { intervals: Intervals{{1, 3}, {4, 7}}, remaint: []int64{0, 8, 9}, }, + { + intervals: Intervals{{1, 3}, {4, 700}}, + remaint: []int64{0}, + }, + { // This case is to ensure that labels and symbols are deleted. + intervals: Intervals{{0, 9}}, + remaint: []int64{}, + }, } Outer: for _, c := range cases { + db, close := openTestDB(t, nil) + defer close() + defer db.Close() + + app := db.Appender() + + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + } + + testutil.Ok(t, app.Commit()) + // TODO(gouthamve): Reset the tombstones somehow. // Delete the ranges. for _, r := range c.intervals { @@ -256,9 +273,20 @@ Outer: newSeries(map[string]string{"a": "b"}, expSamples), }) + lns, err := q.LabelNames() + testutil.Ok(t, err) + lvs, err := q.LabelValues("a") + testutil.Ok(t, err) if len(expSamples) == 0 { + testutil.Equals(t, 0, len(lns)) + testutil.Equals(t, 0, len(lvs)) testutil.Assert(t, res.Next() == false, "") continue + } else { + testutil.Equals(t, 1, len(lns)) + testutil.Equals(t, 1, len(lvs)) + testutil.Equals(t, "a", lns[0]) + testutil.Equals(t, "b", lvs[0]) } for { diff --git a/head.go b/head.go index cbc8661f8c..637e8477dc 100644 --- a/head.go +++ b/head.go @@ -48,6 +48,10 @@ var ( // ErrOutOfBounds is returned if an appended sample is out of the // writable time range. ErrOutOfBounds = errors.New("out of bounds") + + // emptyTombstoneReader is a no-op Tombstone Reader. + // This is used by head to satisfy the Tombstones() function call. + emptyTombstoneReader = newMemTombstones() ) // Head handles reads and writes of time series data within a time window. @@ -71,8 +75,6 @@ type Head struct { values map[string]stringset // label names to possible values postings *index.MemPostings // postings lists for terms - - tombstones *memTombstones } type headMetrics struct { @@ -231,7 +233,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: newMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -334,12 +335,14 @@ func (h *Head) loadWAL(r *wal.Reader) error { } var ( - dec RecordDecoder - series []RefSeries - samples []RefSample - tstones []Stone - err error + dec RecordDecoder + series []RefSeries + samples []RefSample + tstones []Stone + allStones = newMemTombstones() + err error ) + defer allStones.Close() for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] rec := r.Record() @@ -413,7 +416,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { if itv.Maxt < h.minValidTime { continue } - h.tombstones.addInterval(s.ref, itv) + allStones.addInterval(s.ref, itv) } } default: @@ -436,6 +439,12 @@ func (h *Head) loadWAL(r *wal.Reader) error { } wg.Wait() + if err := allStones.Iter(func(ref uint64, dranges Intervals) error { + return h.chunkRewrite(ref, dranges) + }); err != nil { + return errors.Wrap(r.Err(), "deleting samples from tombstones") + } + if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) } @@ -604,7 +613,7 @@ func (h *rangeHead) Chunks() (ChunkReader, error) { } func (h *rangeHead) Tombstones() (TombstoneReader, error) { - return h.head.tombstones, nil + return emptyTombstoneReader, nil } // initAppender is a helper to initialize the time bounds of the head @@ -849,7 +858,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { } var stones []Stone - + dirty := false for p.Next() { series := h.series.getByID(p.At()) @@ -859,22 +868,61 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { } // Delete only until the current values and not beyond. t0, t1 = clampInterval(mint, maxt, t0, t1) - stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + if h.wal != nil { + stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + } + if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil { + return errors.Wrap(err, "delete samples") + } + dirty = true } - if p.Err() != nil { return p.Err() } var enc RecordEncoder - if h.wal != nil { + // Although we don't store the stones in the head + // we need to write them to the WAL to mark these as deleted + // after a restart while loeading the WAL. if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { return err } } - for _, s := range stones { - h.tombstones.addInterval(s.ref, s.intervals[0]) + if dirty { + h.gc() } + + return nil +} + +// chunkRewrite re-writes the chunks which overlaps with deleted ranges +// and removes the samples in the deleted ranges. +// Chunks is deleted if no samples are left at the end. +func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) { + if len(dranges) == 0 { + return nil + } + + ms := h.series.getByID(ref) + ms.Lock() + defer ms.Unlock() + if len(ms.chunks) == 0 { + return nil + } + + metas := ms.chunksMetas() + mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime + it := newChunkSeriesIterator(metas, dranges, mint, maxt) + + ms.reset() + for it.Next() { + t, v := it.At() + ok, _ := ms.append(t, v) + if !ok { + level.Warn(h.logger).Log("msg", "failed to add sample during delete") + } + } + return nil } @@ -926,7 +974,7 @@ func (h *Head) gc() { // Tombstones returns a new reader over the head's tombstones func (h *Head) Tombstones() (TombstoneReader, error) { - return h.tombstones, nil + return emptyTombstoneReader, nil } // Index returns an IndexReader against the block. @@ -1406,6 +1454,16 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. } +func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { + s := &memSeries{ + lset: lset, + ref: id, + chunkRange: chunkRange, + nextAt: math.MinInt64, + } + return s +} + func (s *memSeries) minTime() int64 { if len(s.chunks) == 0 { return math.MinInt64 @@ -1442,14 +1500,24 @@ func (s *memSeries) cut(mint int64) *memChunk { return c } -func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { - s := &memSeries{ - lset: lset, - ref: id, - chunkRange: chunkRange, - nextAt: math.MinInt64, +func (s *memSeries) chunksMetas() []chunks.Meta { + metas := make([]chunks.Meta, 0, len(s.chunks)) + for _, chk := range s.chunks { + metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime}) } - return s + return metas +} + +// reset re-initialises all the variable in the memSeries except 'lset', 'ref', +// and 'chunkRange', like how it would appear after 'newMemSeries(...)'. +func (s *memSeries) reset() { + s.chunks = nil + s.headChunk = nil + s.firstChunkID = 0 + s.nextAt = math.MinInt64 + s.sampleBuf = [4]sample{} + s.pendingCommit = false + s.app = nil } // appendable checks whether the given sample is valid for appending to the series. diff --git a/head_test.go b/head_test.go index 8781f677af..cfc307f25f 100644 --- a/head_test.go +++ b/head_test.go @@ -18,6 +18,7 @@ import ( "math" "math/rand" "os" + "path" "path/filepath" "sort" "testing" @@ -297,92 +298,165 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { func TestHeadDeleteSimple(t *testing.T) { numSamples := int64(10) - head, err := NewHead(nil, nil, nil, 1000) - testutil.Ok(t, err) - defer head.Close() - - app := head.Appender() - - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) - } - - testutil.Ok(t, app.Commit()) cases := []struct { - intervals Intervals - remaint []int64 + dranges Intervals + remaint []int64 + remainSampbuf []int64 // Sample buffer that should remain after deletion. }{ { - intervals: Intervals{{0, 3}}, - remaint: []int64{4, 5, 6, 7, 8, 9}, + dranges: Intervals{{0, 3}}, + remaint: []int64{4, 5, 6, 7, 8, 9}, + remainSampbuf: []int64{6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}}, - remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + dranges: Intervals{{1, 3}}, + remaint: []int64{0, 4, 5, 6, 7, 8, 9}, + remainSampbuf: []int64{6, 7, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 7}}, - remaint: []int64{0, 8, 9}, + dranges: Intervals{{1, 3}, {4, 7}}, + remaint: []int64{0, 8, 9}, + remainSampbuf: []int64{0, 8, 9}, }, { - intervals: Intervals{{1, 3}, {4, 700}}, - remaint: []int64{0}, + dranges: Intervals{{1, 3}, {4, 700}}, + remaint: []int64{0}, + remainSampbuf: []int64{0}, }, - { - intervals: Intervals{{0, 9}}, - remaint: []int64{}, + { // This case is to ensure that labels and symbols are deleted. + dranges: Intervals{{0, 9}}, + remaint: []int64{}, + remainSampbuf: []int64{}, }, } Outer: for _, c := range cases { - // Reset the tombstones. - head.tombstones = newMemTombstones() + dir, err := ioutil.TempDir("", "test_wal_reload") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + w, err := wal.New(nil, nil, path.Join(dir, "wal")) + testutil.Ok(t, err) + + // Samples are deleted from head after calling head.Delete() + // and not just creating tombstones. + // Hence creating new Head for every case. + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + + app := head.Appender() + smpls := make([]float64, numSamples) + for i := int64(0); i < numSamples; i++ { + smpls[i] = rand.Float64() + _, err = app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) // Delete the ranges. - for _, r := range c.intervals { + for _, r := range c.dranges { testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) } - // Compare the result. - q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime()) + reloadedW, err := wal.New(nil, nil, w.Dir()) testutil.Ok(t, err) - res, err := q.Select(labels.NewEqualMatcher("a", "b")) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) + // Test the head reloaded from the WAL to ensure deleted samples + // are gone even after reloading the wal file. testutil.Ok(t, err) + testutil.Ok(t, reloadedHead.Init(0)) - expSamples := make([]Sample, 0, len(c.remaint)) + expSamples := make([]sample, 0, len(c.remaint)) for _, ts := range c.remaint { expSamples = append(expSamples, sample{ts, smpls[ts]}) } - expss := newMockSeriesSet([]Series{ - newSeries(map[string]string{"a": "b"}, expSamples), - }) + // Compare the samples for both heads - before and after the reload. + for _, h := range []*Head{head, reloadedHead} { + indexr, err := h.Index() + testutil.Ok(t, err) + // We use emptyTombstoneReader explicitly to get all the samples. + css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) - if len(expSamples) == 0 { - testutil.Assert(t, res.Next() == false, "") - continue + // Getting the actual samples. + actSamples := make([]sample, 0, len(c.remaint)) + if len(expSamples) > 0 { + testutil.Assert(t, css.Next() == true, "") + lbls, chkMetas, intv := css.At() + testutil.Equals(t, labels.Labels{{"a", "b"}}, lbls) + testutil.Equals(t, 0, len(intv)) + + chunkr, err := h.Chunks() + testutil.Ok(t, err) + for _, meta := range chkMetas { + chk, err := chunkr.Chunk(meta.Ref) + testutil.Ok(t, err) + ii := chk.Iterator() + for ii.Next() { + t, v := ii.At() + actSamples = append(actSamples, sample{t: t, v: v}) + } + } + } + + testutil.Assert(t, css.Next() == false, "") + testutil.Ok(t, css.Err()) + testutil.Equals(t, expSamples, actSamples) } - for { - eok, rok := expss.Next(), res.Next() - testutil.Equals(t, eok, rok) + expSamplesTemp := make([]Sample, 0, len(c.remaint)) + for _, ts := range c.remaint { + expSamplesTemp = append(expSamplesTemp, sample{ts, smpls[ts]}) + } + expSeriesSet := newMockSeriesSet([]Series{ + newSeries(map[string]string{"a": "b"}, expSamplesTemp), + }) - if !eok { - continue Outer + // Compare the query results for both heads - before and after the reload. + for _, h := range []*Head{head, reloadedHead} { + q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) + testutil.Ok(t, err) + actSeriesSet, err := q.Select(labels.NewEqualMatcher("a", "b")) + testutil.Ok(t, err) + + lns, err := q.LabelNames() + testutil.Ok(t, err) + lvs, err := q.LabelValues("a") + testutil.Ok(t, err) + if len(expSamples) == 0 { + testutil.Equals(t, 0, len(lns)) + testutil.Equals(t, 0, len(lvs)) + testutil.Assert(t, actSeriesSet.Next() == false, "") + testutil.Ok(t, h.Close()) + continue + } else { + testutil.Equals(t, 1, len(lns)) + testutil.Equals(t, 1, len(lvs)) + testutil.Equals(t, "a", lns[0]) + testutil.Equals(t, "b", lvs[0]) } - sexp := expss.At() - sres := res.At() - testutil.Equals(t, sexp.Labels(), sres.Labels()) + for { + eok, rok := expSeriesSet.Next(), actSeriesSet.Next() + testutil.Equals(t, eok, rok) - smplExp, errExp := expandSeriesIterator(sexp.Iterator()) - smplRes, errRes := expandSeriesIterator(sres.Iterator()) + if !eok { + testutil.Ok(t, h.Close()) + continue Outer + } + expSeries := expSeriesSet.At() + actSeries := actSeriesSet.At() - testutil.Equals(t, errExp, errRes) - testutil.Equals(t, smplExp, smplRes) + testutil.Equals(t, expSeries.Labels(), actSeries.Labels()) + + smplExp, errExp := expandSeriesIterator(expSeries.Iterator()) + smplRes, errRes := expandSeriesIterator(actSeries.Iterator()) + + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) + } } } } @@ -523,8 +597,6 @@ func TestDelete_e2e(t *testing.T) { // TODO: Add Regexp Matchers. } for _, del := range dels { - // Reset the deletes everytime. - hb.tombstones = newMemTombstones() for _, r := range del.drange { testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) } @@ -945,4 +1017,5 @@ func TestWalRepair(t *testing.T) { testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") }) } + } diff --git a/testutil/testutil.go b/testutil/testutil.go index 06b9747ca5..03784e7f2d 100644 --- a/testutil/testutil.go +++ b/testutil/testutil.go @@ -66,6 +66,15 @@ func Equals(tb testing.TB, exp, act interface{}, msgAndArgs ...interface{}) { } } +// NotEquals fails the test if exp is equal to act. +func NotEquals(tb testing.TB, exp, act interface{}) { + if reflect.DeepEqual(exp, act) { + _, file, line, _ := runtime.Caller(1) + fmt.Printf("\033[31m%s:%d: Expected different exp and got\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act) + tb.FailNow() + } +} + func formatMessage(msgAndArgs []interface{}) string { if len(msgAndArgs) == 0 { return "" From 4592b77035ce33bd08a290586139f2ba102850f6 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 11 Jan 2019 22:04:09 +0530 Subject: [PATCH 06/24] Patch by Krasi (https://github.com/codesome/tsdb/pull/3) Signed-off-by: Ganesh Vernekar --- head_test.go | 116 ++++++++++++++++++++++++--------------------------- 1 file changed, 54 insertions(+), 62 deletions(-) diff --git a/head_test.go b/head_test.go index cfc307f25f..2f57cebbf7 100644 --- a/head_test.go +++ b/head_test.go @@ -296,37 +296,40 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { } func TestHeadDeleteSimple(t *testing.T) { - numSamples := int64(10) + + buildSmpls := func(s []int64) []sample { + ss := make([]sample, 0, len(s)) + for _, t := range s { + ss = append(ss, sample{t: t, v: float64(t)}) + } + return ss + } + smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + lblDefault := labels.Label{"a", "b"} cases := []struct { - dranges Intervals - remaint []int64 - remainSampbuf []int64 // Sample buffer that should remain after deletion. + dranges Intervals + smplsExp []sample }{ { - dranges: Intervals{{0, 3}}, - remaint: []int64{4, 5, 6, 7, 8, 9}, - remainSampbuf: []int64{6, 7, 8, 9}, + dranges: Intervals{{0, 3}}, + smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}}, - remaint: []int64{0, 4, 5, 6, 7, 8, 9}, - remainSampbuf: []int64{6, 7, 8, 9}, + dranges: Intervals{{1, 3}}, + smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 7}}, - remaint: []int64{0, 8, 9}, - remainSampbuf: []int64{0, 8, 9}, + dranges: Intervals{{1, 3}, {4, 7}}, + smplsExp: buildSmpls([]int64{0, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 700}}, - remaint: []int64{0}, - remainSampbuf: []int64{0}, + dranges: Intervals{{1, 3}, {4, 700}}, + smplsExp: buildSmpls([]int64{0}), }, { // This case is to ensure that labels and symbols are deleted. - dranges: Intervals{{0, 9}}, - remaint: []int64{}, - remainSampbuf: []int64{}, + dranges: Intervals{{0, 9}}, + smplsExp: buildSmpls([]int64{}), }, } @@ -339,53 +342,40 @@ Outer: w, err := wal.New(nil, nil, path.Join(dir, "wal")) testutil.Ok(t, err) - // Samples are deleted from head after calling head.Delete() - // and not just creating tombstones. - // Hence creating new Head for every case. head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) app := head.Appender() - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - _, err = app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + for _, smpl := range smplsAll { + _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) + } testutil.Ok(t, app.Commit()) // Delete the ranges. for _, r := range c.dranges { - testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) - } - - reloadedW, err := wal.New(nil, nil, w.Dir()) - testutil.Ok(t, err) - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) - // Test the head reloaded from the WAL to ensure deleted samples - // are gone even after reloading the wal file. - testutil.Ok(t, err) - testutil.Ok(t, reloadedHead.Init(0)) - - expSamples := make([]sample, 0, len(c.remaint)) - for _, ts := range c.remaint { - expSamples = append(expSamples, sample{ts, smpls[ts]}) + testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))) } // Compare the samples for both heads - before and after the reload. + reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload. + testutil.Ok(t, err) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) + testutil.Ok(t, err) + testutil.Ok(t, reloadedHead.Init(0)) for _, h := range []*Head{head, reloadedHead} { indexr, err := h.Index() testutil.Ok(t, err) - // We use emptyTombstoneReader explicitly to get all the samples. - css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher("a", "b")) + // Use an emptyTombstoneReader explicitly to get all the samples. + css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) // Getting the actual samples. - actSamples := make([]sample, 0, len(c.remaint)) - if len(expSamples) > 0 { - testutil.Assert(t, css.Next() == true, "") - lbls, chkMetas, intv := css.At() - testutil.Equals(t, labels.Labels{{"a", "b"}}, lbls) + actSamples := make([]sample, 0) + for css.Next() { + lblsAct, chkMetas, intv := css.At() + testutil.Equals(t, labels.Labels{lblDefault}, lblsAct) testutil.Equals(t, 0, len(intv)) chunkr, err := h.Chunks() @@ -401,31 +391,33 @@ Outer: } } - testutil.Assert(t, css.Next() == false, "") testutil.Ok(t, css.Err()) - testutil.Equals(t, expSamples, actSamples) + testutil.Equals(t, c.smplsExp, actSamples) } - expSamplesTemp := make([]Sample, 0, len(c.remaint)) - for _, ts := range c.remaint { - expSamplesTemp = append(expSamplesTemp, sample{ts, smpls[ts]}) - } - expSeriesSet := newMockSeriesSet([]Series{ - newSeries(map[string]string{"a": "b"}, expSamplesTemp), - }) - // Compare the query results for both heads - before and after the reload. + expSeriesSet := newMockSeriesSet([]Series{ + newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []Sample { + ss := make([]Sample, 0, len(c.smplsExp)) + for _, s := range c.smplsExp { + ss = append(ss, s) + } + return ss + }(), + ), + }) for _, h := range []*Head{head, reloadedHead} { q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) testutil.Ok(t, err) - actSeriesSet, err := q.Select(labels.NewEqualMatcher("a", "b")) + actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) lns, err := q.LabelNames() testutil.Ok(t, err) - lvs, err := q.LabelValues("a") + lvs, err := q.LabelValues(lblDefault.Name) testutil.Ok(t, err) - if len(expSamples) == 0 { + // When all samples are deleted we expect that no labels should exist either. + if len(c.smplsExp) == 0 { testutil.Equals(t, 0, len(lns)) testutil.Equals(t, 0, len(lvs)) testutil.Assert(t, actSeriesSet.Next() == false, "") @@ -434,8 +426,8 @@ Outer: } else { testutil.Equals(t, 1, len(lns)) testutil.Equals(t, 1, len(lvs)) - testutil.Equals(t, "a", lns[0]) - testutil.Equals(t, "b", lvs[0]) + testutil.Equals(t, lblDefault.Name, lns[0]) + testutil.Equals(t, lblDefault.Value, lvs[0]) } for { From b521559c3b22c00327675906acd5d2e0bfcdc044 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 29 Jan 2019 10:26:01 +0200 Subject: [PATCH 07/24] do a proper cleanup for a failed reload after a compaction a failed reload immediately after a compaction should delete the resulting block to avoid creating blocks with the same time range. Signed-off-by: Krasi Georgiev --- compact_test.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++ db.go | 11 +++++-- 2 files changed, 85 insertions(+), 2 deletions(-) diff --git a/compact_test.go b/compact_test.go index 9a45229f21..b496e8eac5 100644 --- a/compact_test.go +++ b/compact_test.go @@ -17,12 +17,14 @@ import ( "io/ioutil" "math" "os" + "path" "path/filepath" "testing" "time" "github.com/go-kit/kit/log" "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" dto "github.com/prometheus/client_model/go" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" @@ -741,3 +743,77 @@ func TestDisableAutoCompactions(t *testing.T) { } testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } + +// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload imidiately after a compaction +// deletes the resulting block to avoid creatings blocks with the same time range. +func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { + + tests := map[string]func(*DB) int{ + "Test Head Compaction": func(db *DB) int { + rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 + defaultLabel := labels.FromStrings("foo", "bar") + + // Add some data to the head that is enough to trigger a compaction. + app := db.Appender() + _, err := app.Add(defaultLabel, 1, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 2, 0) + testutil.Ok(t, err) + _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + return 1 + }, + "Test Block Compaction": func(db *DB) int { + expBlocks := []*BlockMeta{ + {MinTime: 0, MaxTime: 100}, + {MinTime: 100, MaxTime: 150}, + {MinTime: 150, MaxTime: 200}, + } + for _, m := range expBlocks { + createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) + } + testutil.Ok(t, db.reload()) + testutil.Equals(t, len(expBlocks), len(db.Blocks()), "unexpected block count after a reload") + + return len(expBlocks) + 1 + }, + } + + for title, bootStrap := range tests { + t.Run(title, func(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1, 100}, + }) + defer close() + defer db.Close() + db.DisableCompactions() + + expBlocks := bootStrap(db) + + // Create a block that will trigger the reloard to fail. + blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) + lastBlockIndex := path.Join(blockPath, indexFilename) + actBlocks, err := blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expBlocks, len(actBlocks)) + testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. + + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") + + // Do the compaction and check the metrics. + // Since the most recent block is not included in the compaction, + // the compaction should succeed, but the reload should fail and + // the new block created from the compaction should be deleted. + db.EnableCompactions() + testutil.NotOk(t, db.compact()) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch") + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") + actBlocks, err = blockDirs(db.Dir()) + testutil.Ok(t, err) + testutil.Equals(t, expBlocks, len(actBlocks)) + }) + } +} diff --git a/db.go b/db.go index bd3388b203..8b2e795ca7 100644 --- a/db.go +++ b/db.go @@ -425,6 +425,9 @@ func (db *DB) compact() (err error) { runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete persisted head block after unsuccessful db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } if (uid == ulid.ULID{}) { @@ -454,12 +457,16 @@ func (db *DB) compact() (err error) { default: } - if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { + uid, err := db.compactor.Compact(db.dir, plan, db.blocks) + if err != nil { return errors.Wrapf(err, "compact %s", plan) } runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete compacted block after unsuccessful db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } runtime.GC() @@ -505,7 +512,7 @@ func (db *DB) reload() (err error) { } } if len(corrupted) > 0 { - return errors.Wrap(err, "unexpected corrupted block") + return fmt.Errorf("unexpected corrupted block:%v", corrupted) } // All deletable blocks should not be loaded. From 53c18e7a41c019982ff8d4b96578f95cb47335a9 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 29 Jan 2019 10:32:32 +0200 Subject: [PATCH 08/24] use a global indexFilename constant Signed-off-by: Krasi Georgiev --- block.go | 2 +- index/index.go | 2 ++ index/index_test.go | 8 ++++---- repair.go | 2 +- repair_test.go | 6 +++--- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/block.go b/block.go index 42e11d9513..3ed8c4a640 100644 --- a/block.go +++ b/block.go @@ -282,7 +282,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error if err != nil { return nil, err } - ir, err := index.NewFileReader(filepath.Join(dir, "index")) + ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) if err != nil { return nil, err } diff --git a/index/index.go b/index/index.go index 74e08d4651..ca33eac3fc 100644 --- a/index/index.go +++ b/index/index.go @@ -45,6 +45,8 @@ const ( FormatV2 = 2 labelNameSeperator = "\xff" + + indexFilename = "index" ) type indexWriterSeries struct { diff --git a/index/index_test.go b/index/index_test.go index 2edd3956a4..25c5be938b 100644 --- a/index/index_test.go +++ b/index/index_test.go @@ -151,7 +151,7 @@ func TestIndexRW_Create_Open(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - fn := filepath.Join(dir, "index") + fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. iw, err := NewWriter(fn) @@ -177,7 +177,7 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - fn := filepath.Join(dir, "index") + fn := filepath.Join(dir, indexFilename) iw, err := NewWriter(fn) testutil.Ok(t, err) @@ -271,7 +271,7 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - iw, err := NewWriter(filepath.Join(dir, "index")) + iw, err := NewWriter(filepath.Join(dir, indexFilename)) testutil.Ok(t, err) testutil.Ok(t, iw.AddSymbols(symbols)) @@ -331,7 +331,7 @@ func TestPersistence_index_e2e(t *testing.T) { err = iw.Close() testutil.Ok(t, err) - ir, err := NewFileReader(filepath.Join(dir, "index")) + ir, err := NewFileReader(filepath.Join(dir, indexFilename)) testutil.Ok(t, err) for p := range mi.postings { diff --git a/repair.go b/repair.go index 15f79d5f78..4aeffb5547 100644 --- a/repair.go +++ b/repair.go @@ -64,7 +64,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { if err != nil { return wrapErr(err, d) } - broken, err := os.Open(filepath.Join(d, "index")) + broken, err := os.Open(filepath.Join(d, indexFilename)) if err != nil { return wrapErr(err, d) } diff --git a/repair_test.go b/repair_test.go index 5fb780a5bb..cbe21691e6 100644 --- a/repair_test.go +++ b/repair_test.go @@ -30,7 +30,7 @@ func TestRepairBadIndexVersion(t *testing.T) { // at a broken revision. // // func main() { - // w, err := index.NewWriter("index") + // w, err := index.NewWriter(indexFilename) // if err != nil { // panic(err) // } @@ -72,7 +72,7 @@ func TestRepairBadIndexVersion(t *testing.T) { os.MkdirAll(filepath.Join(dbDir, "chunks"), 0777) defer os.RemoveAll(filepath.Join(dbDir, "chunks")) - r, err := index.NewFileReader(filepath.Join(dbDir, "index")) + r, err := index.NewFileReader(filepath.Join(dbDir, indexFilename)) testutil.Ok(t, err) p, err := r.Postings("b", "1") testutil.Ok(t, err) @@ -95,7 +95,7 @@ func TestRepairBadIndexVersion(t *testing.T) { testutil.Ok(t, err) db.Close() - r, err = index.NewFileReader(filepath.Join(tmpDbDir, "index")) + r, err = index.NewFileReader(filepath.Join(tmpDbDir, indexFilename)) testutil.Ok(t, err) p, err = r.Postings("b", "1") testutil.Ok(t, err) From 3ec08eac5062396f08e4e5f98c76ff7bf7b0e6ec Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 29 Jan 2019 10:32:59 +0200 Subject: [PATCH 09/24] use camelcase for rangeToTriggerCompaction Signed-off-by: Krasi Georgiev --- db_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/db_test.go b/db_test.go index e3e90ce8b3..15da12847f 100644 --- a/db_test.go +++ b/db_test.go @@ -1373,7 +1373,7 @@ func TestNoEmptyBlocks(t *testing.T) { defer db.Close() db.DisableCompactions() - rangeToTriggercompaction := db.opts.BlockRanges[0]/2*3 - 1 + rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 defaultLabel := labels.FromStrings("foo", "bar") defaultMatcher := labels.NewMustRegexpMatcher("", ".*") @@ -1392,7 +1392,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) _, err = app.Add(defaultLabel, 2, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, 3+rangeToTriggercompaction, 0) + _, err = app.Add(defaultLabel, 3+rangeToTriggerCompaction, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Ok(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) @@ -1414,7 +1414,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) _, err = app.Add(defaultLabel, currentTime+1, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) @@ -1435,7 +1435,7 @@ func TestNoEmptyBlocks(t *testing.T) { testutil.Ok(t, err) _, err = app.Add(defaultLabel, currentTime+1, 0) testutil.Ok(t, err) - _, err = app.Add(defaultLabel, currentTime+rangeToTriggercompaction, 0) + _, err = app.Add(defaultLabel, currentTime+rangeToTriggerCompaction, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) testutil.Ok(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) From 1603222bbc8ff6dedb79b047d9fd7535f96720ec Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 30 Jan 2019 11:40:12 +0200 Subject: [PATCH 10/24] small refactor of openTestDB to handle errors properly. Signed-off-by: Krasi Georgiev --- compact_test.go | 16 +++-- db_test.go | 188 +++++++++++++++++++++++++++++------------------- 2 files changed, 124 insertions(+), 80 deletions(-) diff --git a/compact_test.go b/compact_test.go index b496e8eac5..7c5f4f05fc 100644 --- a/compact_test.go +++ b/compact_test.go @@ -693,9 +693,11 @@ func TestCompaction_populateBlock(t *testing.T) { // This is needed for unit tests that rely on // checking state before and after a compaction. func TestDisableAutoCompactions(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blockRange := DefaultOptions.BlockRanges[0] label := labels.FromStrings("foo", "bar") @@ -783,11 +785,13 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { for title, bootStrap := range tests { t.Run(title, func(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{1, 100}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() db.DisableCompactions() expBlocks := bootStrap(db) diff --git a/db_test.go b/db_test.go index 3aa5e29b2b..f8a6119f82 100644 --- a/db_test.go +++ b/db_test.go @@ -46,7 +46,9 @@ func openTestDB(t testing.TB, opts *Options) (db *DB, close func()) { testutil.Ok(t, err) // Do not close the test database by default as it will deadlock on test failures. - return db, func() { os.RemoveAll(tmpdir) } + return db, func() { + testutil.Ok(t, os.RemoveAll(tmpdir)) + } } // query runs a matcher query against the querier and fully expands its data. @@ -78,9 +80,11 @@ func query(t testing.TB, q Querier, matchers ...labels.Matcher) map[string][]sam // Ensure that blocks are held in memory in their time order // and not in ULID order as they are read from the directory. func TestDB_reloadOrder(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() metas := []BlockMeta{ {MinTime: 90, MaxTime: 100}, @@ -106,9 +110,11 @@ func TestDB_reloadOrder(t *testing.T) { } func TestDataAvailableOnlyAfterCommit(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -135,9 +141,11 @@ func TestDataAvailableOnlyAfterCommit(t *testing.T) { } func TestDataNotAvailableAfterRollback(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.FromStrings("foo", "bar"), 0, 0) @@ -156,9 +164,11 @@ func TestDataNotAvailableAfterRollback(t *testing.T) { } func TestDBAppenderAddRef(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app1 := db.Appender() @@ -213,9 +223,11 @@ func TestDBAppenderAddRef(t *testing.T) { func TestDeleteSimple(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -287,9 +299,11 @@ Outer: } func TestAmendDatapointCausesError(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.Labels{}, 0, 0) @@ -303,9 +317,11 @@ func TestAmendDatapointCausesError(t *testing.T) { } func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.Labels{}, 0, math.NaN()) @@ -318,10 +334,11 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { } func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() - + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) testutil.Ok(t, err) @@ -333,9 +350,11 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { } func TestSkippingInvalidValuesInSameTxn(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() // Append AmendedValue. app := db.Appender() @@ -377,8 +396,8 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { } func TestDB_Snapshot(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() // append data app := db.Appender() @@ -401,11 +420,11 @@ func TestDB_Snapshot(t *testing.T) { // reopen DB from snapshot db, err = Open(snap, nil, nil, nil) testutil.Ok(t, err) - defer db.Close() + defer func() { testutil.Ok(t, db.Close()) }() querier, err := db.Querier(mint, mint+1000) testutil.Ok(t, err) - defer querier.Close() + defer func() { querier.Close() }() // sum values seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) @@ -427,8 +446,8 @@ func TestDB_Snapshot(t *testing.T) { func TestDB_SnapshotWithDelete(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() app := db.Appender() @@ -468,12 +487,12 @@ Outer: // reopen DB from snapshot db, err = Open(snap, nil, nil, nil) testutil.Ok(t, err) - defer db.Close() + defer func() { testutil.Ok(t, db.Close()) }() // Compare the result. q, err := db.Querier(0, numSamples) testutil.Ok(t, err) - defer q.Close() + defer func() { testutil.Ok(t, q.Close()) }() res, err := q.Select(labels.NewEqualMatcher("a", "b")) testutil.Ok(t, err) @@ -570,9 +589,11 @@ func TestDB_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []sample{} } - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -675,8 +696,8 @@ func TestDB_e2e(t *testing.T) { } func TestWALFlushedOnDBClose(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() dirDb := db.Dir() @@ -691,7 +712,7 @@ func TestWALFlushedOnDBClose(t *testing.T) { db, err = Open(dirDb, nil, nil, nil) testutil.Ok(t, err) - defer db.Close() + defer func() { testutil.Ok(t, db.Close()) }() q, err := db.Querier(0, 1) testutil.Ok(t, err) @@ -704,8 +725,8 @@ func TestWALFlushedOnDBClose(t *testing.T) { func TestWALSegmentSizeOption(t *testing.T) { options := *DefaultOptions options.WALSegmentSize = 2 * 32 * 1024 - db, close := openTestDB(t, &options) - defer close() + db, delete := openTestDB(t, &options) + defer delete() app := db.Appender() for i := int64(0); i < 155; i++ { _, err := app.Add(labels.Labels{labels.Label{Name: "wal", Value: "size"}}, i, rand.Float64()) @@ -730,8 +751,8 @@ func TestWALSegmentSizeOption(t *testing.T) { func TestTombstoneClean(t *testing.T) { numSamples := int64(10) - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer delete() app := db.Appender() @@ -827,9 +848,11 @@ func TestTombstoneClean(t *testing.T) { // if TombstoneClean leaves any blocks behind these will overlap. func TestTombstoneCleanFail(t *testing.T) { - db, close := openTestDB(t, nil) - defer db.Close() - defer close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() var expectedBlockDirs []string @@ -906,11 +929,13 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } func TestTimeRetention(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{1000}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blocks := []*BlockMeta{ {MinTime: 500, MaxTime: 900}, // Oldest block @@ -938,11 +963,13 @@ func TestTimeRetention(t *testing.T) { } func TestSizeRetention(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{100}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blocks := []*BlockMeta{ {MinTime: 100, MaxTime: 200}, // Oldest block @@ -1000,8 +1027,11 @@ func dbDiskSize(dir string) int64 { } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() labelpairs := []labels.Labels{ labels.FromStrings("a", "abcd", "b", "abcde"), @@ -1059,7 +1089,7 @@ func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { q, err := db.Querier(0, 10) testutil.Ok(t, err) - defer q.Close() + defer func() { testutil.Ok(t, q.Close()) }() for _, c := range cases { ss, err := q.Select(c.selector...) @@ -1175,9 +1205,11 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) { // Regression test for https://github.com/prometheus/tsdb/issues/347 func TestChunkAtBlockBoundary(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -1229,9 +1261,11 @@ func TestChunkAtBlockBoundary(t *testing.T) { } func TestQuerierWithBoundaryChunks(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() app := db.Appender() @@ -1366,11 +1400,13 @@ func TestInitializeHeadTimestamp(t *testing.T) { } func TestNoEmptyBlocks(t *testing.T) { - db, close := openTestDB(t, &Options{ + db, delete := openTestDB(t, &Options{ BlockRanges: []int64{100}, }) - defer close() - defer db.Close() + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() db.DisableCompactions() rangeToTriggerCompaction := db.opts.BlockRanges[0]/2*3 - 1 @@ -1524,9 +1560,11 @@ func TestDB_LabelNames(t *testing.T) { testutil.Ok(t, err) } for _, tst := range tests { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() appendSamples(db, 0, 4, tst.sampleLabels1) @@ -1567,9 +1605,11 @@ func TestDB_LabelNames(t *testing.T) { } func TestCorrectNumTombstones(t *testing.T) { - db, close := openTestDB(t, nil) - defer close() - defer db.Close() + db, delete := openTestDB(t, nil) + defer func() { + testutil.Ok(t, db.Close()) + delete() + }() blockRange := DefaultOptions.BlockRanges[0] defaultLabel := labels.FromStrings("foo", "bar") From 315de4c7825519408cd1ece09e8fa4bee3eaa9d9 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 30 Jan 2019 11:40:40 +0200 Subject: [PATCH 11/24] fix windows tests Signed-off-by: Krasi Georgiev --- block.go | 20 ++++++++++++++++++-- db.go | 6 ++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/block.go b/block.go index 3ed8c4a640..26853ea304 100644 --- a/block.go +++ b/block.go @@ -269,7 +269,7 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } @@ -282,15 +282,31 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error if err != nil { return nil, err } + defer func() { + if err != nil { + cr.Close() + } + }() ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) if err != nil { return nil, err } + defer func() { + if err != nil { + ir.Close() + } + }() + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + defer func() { + if err != nil { + tr.Close() + } + }() // TODO refactor to set this at block creation time as // that would be the logical place for a block size to be calculated. @@ -301,7 +317,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) } - pb := &Block{ + pb = &Block{ dir: dir, meta: *meta, chunkr: cr, diff --git a/db.go b/db.go index 8b2e795ca7..5063b6837a 100644 --- a/db.go +++ b/db.go @@ -512,6 +512,12 @@ func (db *DB) reload() (err error) { } } if len(corrupted) > 0 { + // Close all new blocks to release the lock for windows. + for _, block := range loadable { + if _, loaded := db.getBlock(block.Meta().ULID); !loaded { + block.Close() + } + } return fmt.Errorf("unexpected corrupted block:%v", corrupted) } From 6c34eb8b636b3d08432e96acce0c6d856fd4c94b Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 30 Jan 2019 11:40:55 +0200 Subject: [PATCH 12/24] nits Signed-off-by: Krasi Georgiev --- compact_test.go | 19 +++++++++---------- db.go | 4 ++-- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/compact_test.go b/compact_test.go index 7c5f4f05fc..c22babe61a 100644 --- a/compact_test.go +++ b/compact_test.go @@ -765,21 +765,21 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { testutil.Ok(t, err) testutil.Ok(t, app.Commit()) - return 1 + return 0 }, "Test Block Compaction": func(db *DB) int { - expBlocks := []*BlockMeta{ + blocks := []*BlockMeta{ {MinTime: 0, MaxTime: 100}, {MinTime: 100, MaxTime: 150}, {MinTime: 150, MaxTime: 200}, } - for _, m := range expBlocks { + for _, m := range blocks { createBlock(t, db.Dir(), genSeries(1, 1, m.MinTime, m.MaxTime)) } testutil.Ok(t, db.reload()) - testutil.Equals(t, len(expBlocks), len(db.Blocks()), "unexpected block count after a reload") + testutil.Equals(t, len(blocks), len(db.Blocks()), "unexpected block count after a reload") - return len(expBlocks) + 1 + return len(blocks) }, } @@ -801,15 +801,14 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { lastBlockIndex := path.Join(blockPath, indexFilename) actBlocks, err := blockDirs(db.Dir()) testutil.Ok(t, err) - testutil.Equals(t, expBlocks, len(actBlocks)) - testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. + testutil.Equals(t, expBlocks, len(actBlocks)-1) // -1 to exclude the corrupted block. + testutil.Ok(t, os.RemoveAll(lastBlockIndex)) // Corrupt the block by removing the index file. testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "initial 'failed db reload' count metrics mismatch") testutil.Equals(t, 0.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "initial `compactions` count metric mismatch") // Do the compaction and check the metrics. - // Since the most recent block is not included in the compaction, - // the compaction should succeed, but the reload should fail and + // Compaction should succeed, but the reload should fail and // the new block created from the compaction should be deleted. db.EnableCompactions() testutil.NotOk(t, db.compact()) @@ -817,7 +816,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") actBlocks, err = blockDirs(db.Dir()) testutil.Ok(t, err) - testutil.Equals(t, expBlocks, len(actBlocks)) + testutil.Equals(t, expBlocks, len(actBlocks)-1, "block count should be the same as before the compaction") // -1 to exclude the corrupted block. }) } } diff --git a/db.go b/db.go index 5063b6837a..3bce5b08ba 100644 --- a/db.go +++ b/db.go @@ -426,7 +426,7 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { - return errors.Wrapf(err, "delete persisted head block after unsuccessful db reload:%s", uid) + return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid) } return errors.Wrap(err, "reload blocks") } @@ -465,7 +465,7 @@ func (db *DB) compact() (err error) { if err := db.reload(); err != nil { if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { - return errors.Wrapf(err, "delete compacted block after unsuccessful db reload:%s", uid) + return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid) } return errors.Wrap(err, "reload blocks") } From 1bcda9d23f9951f1333cd2a3ba2a97b6b6ec69c4 Mon Sep 17 00:00:00 2001 From: Alec <867245430@qq.com> Date: Mon, 4 Feb 2019 17:30:47 +0800 Subject: [PATCH 13/24] Update tombstones.md (format of tombstone) (#511) --- docs/format/tombstones.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/format/tombstones.md b/docs/format/tombstones.md index 308a458a48..058f5f1e2f 100644 --- a/docs/format/tombstones.md +++ b/docs/format/tombstones.md @@ -25,7 +25,7 @@ The stones section is 0 padded to a multiple of 4 for fast scans. # Tombstone ``` -┌─────────────┬───────────────┬──────────────┐ -│ref │ mint │ maxt │ -└─────────────┴───────────────┴──────────────┘ +┌────────────────┬─────────────────┬────────────────┐ +│ref │ mint │ maxt │ +└────────────────┴─────────────────┴────────────────┘ ``` From 2b6bc9fb3250015cfc4d1041dbcea06fcd2a4222 Mon Sep 17 00:00:00 2001 From: Alec <867245430@qq.com> Date: Tue, 5 Feb 2019 10:23:14 +0800 Subject: [PATCH 14/24] Update index.md Signed-off-by: naivewong <867245430@qq.com> --- docs/format/index.md | 56 ++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/docs/format/index.md b/docs/format/index.md index 18600c8351..bd8b8f1963 100644 --- a/docs/format/index.md +++ b/docs/format/index.md @@ -85,34 +85,34 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc `mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one. ``` -┌─────────────────────────────────────────────────────────────────────────┐ -│ len │ -├─────────────────────────────────────────────────────────────────────────┤ -│ ┌──────────────────┬──────────────────────────────────────────────────┐ │ -│ │ │ ┌──────────────────────────────────────────┐ │ │ -│ │ │ │ ref(l_i.name) │ │ │ -│ │ #labels │ ├──────────────────────────────────────────┤ ... │ │ -│ │ │ │ ref(l_i.value) │ │ │ -│ │ │ └──────────────────────────────────────────┘ │ │ -│ ├──────────────────┼──────────────────────────────────────────────────┤ │ -│ │ │ ┌──────────────────────────────────────────┐ │ │ -│ │ │ │ c_0.mint │ │ │ -│ │ │ ├──────────────────────────────────────────┤ │ │ -│ │ │ │ c_0.maxt - c_0.mint │ │ │ -│ │ │ ├──────────────────────────────────────────┤ │ │ -│ │ │ │ ref(c_0.data) │ │ │ -│ │ #chunks │ └──────────────────────────────────────────┘ │ │ -│ │ │ ┌──────────────────────────────────────────┐ │ │ -│ │ │ │ c_i.mint - c_i-1.maxt │ │ │ -│ │ │ ├──────────────────────────────────────────┤ │ │ -│ │ │ │ c_i.maxt - c_i.mint │ │ │ -│ │ │ ├──────────────────────────────────────────┤ ... │ │ -│ │ │ │ ref(c_i.data) - ref(c_i-1.data) │ │ │ -│ │ │ └──────────────────────────────────────────┘ │ │ -│ └──────────────────┴──────────────────────────────────────────────────┘ │ -├─────────────────────────────────────────────────────────────────────────┤ -│ CRC32 <4b> │ -└─────────────────────────────────────────────────────────────────────────┘ +┌───────────────────────────────────────────────────────────────────────────┐ +│ len │ +├───────────────────────────────────────────────────────────────────────────┤ +│ ┌──────────────────┬────────────────────────────────────────────────────┐ │ +│ │ │ ┌────────────────────────────────────────────┐ │ │ +│ │ │ │ ref(l_i.name) │ │ │ +│ │ #labels │ ├────────────────────────────────────────────┤ ... │ │ +│ │ │ │ ref(l_i.value) │ │ │ +│ │ │ └────────────────────────────────────────────┘ │ │ +│ ├──────────────────┼────────────────────────────────────────────────────┤ │ +│ │ │ ┌────────────────────────────────────────────┐ │ │ +│ │ │ │ c_0.mint │ │ │ +│ │ │ ├────────────────────────────────────────────┤ │ │ +│ │ │ │ c_0.maxt - c_0.mint │ │ │ +│ │ │ ├────────────────────────────────────────────┤ │ │ +│ │ │ │ ref(c_0.data) │ │ │ +│ │ #chunks │ └────────────────────────────────────────────┘ │ │ +│ │ │ ┌────────────────────────────────────────────┐ │ │ +│ │ │ │ c_i.mint - c_i-1.maxt │ │ │ +│ │ │ ├────────────────────────────────────────────┤ │ │ +│ │ │ │ c_i.maxt - c_i.mint │ │ │ +│ │ │ ├────────────────────────────────────────────┤ ... │ │ +│ │ │ │ ref(c_i.data) - ref(c_i-1.data) │ │ │ +│ │ │ └────────────────────────────────────────────┘ │ │ +│ └──────────────────┴────────────────────────────────────────────────────┘ │ +├───────────────────────────────────────────────────────────────────────────┤ +│ CRC32 <4b> │ +└───────────────────────────────────────────────────────────────────────────┘ ``` From 8e589474c6d6c711e6e7a6d17fef534444162f82 Mon Sep 17 00:00:00 2001 From: Alec <867245430@qq.com> Date: Tue, 5 Feb 2019 10:28:05 +0800 Subject: [PATCH 15/24] Update index.md Signed-off-by: naivewong <867245430@qq.com> --- docs/format/index.md | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/docs/format/index.md b/docs/format/index.md index bd8b8f1963..001f0b9ffe 100644 --- a/docs/format/index.md +++ b/docs/format/index.md @@ -176,24 +176,24 @@ The sequence of postings sections is finalized by an [offset table](#offset-tabl An offset table stores a sequence of entries that maps a list of strings to an offset. They are used to track label index and postings sections. They are read into memory when an index file is loaded. ``` -┌─────────────────────┬────────────────────┐ -│ len <4b> │ #entries <4b> │ -├─────────────────────┴────────────────────┤ -│ ┌──────────────────────────────────────┐ │ -│ │ n = #strs │ │ -│ ├──────────────────────┬───────────────┤ │ -│ │ len(str_1) │ str_1 │ │ -│ ├──────────────────────┴───────────────┤ │ -│ │ ... │ │ -│ ├──────────────────────┬───────────────┤ │ -│ │ len(str_n) │ str_n │ │ -│ ├──────────────────────┴───────────────┤ │ -│ │ offset │ │ -│ └──────────────────────────────────────┘ │ -│ . . . │ -├──────────────────────────────────────────┤ -│ CRC32 <4b> │ -└──────────────────────────────────────────┘ +┌─────────────────────┬──────────────────────┐ +│ len <4b> │ #entries <4b> │ +├─────────────────────┴──────────────────────┤ +│ ┌────────────────────────────────────────┐ │ +│ │ n = #strs │ │ +│ ├──────────────────────┬─────────────────┤ │ +│ │ len(str_1) │ str_1 │ │ +│ ├──────────────────────┴─────────────────┤ │ +│ │ ... │ │ +│ ├──────────────────────┬─────────────────┤ │ +│ │ len(str_n) │ str_n │ │ +│ ├──────────────────────┴─────────────────┤ │ +│ │ offset │ │ +│ └────────────────────────────────────────┘ │ +│ . . . │ +├────────────────────────────────────────────┤ +│ CRC32 <4b> │ +└────────────────────────────────────────────┘ ``` From 1f723a8eb5490f9988055cd2d6eab2458a61f410 Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Tue, 5 Feb 2019 23:20:43 +0800 Subject: [PATCH 16/24] update #labels and #chunks Signed-off-by: naivewong <867245430@qq.com> --- docs/format/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/format/index.md b/docs/format/index.md index 001f0b9ffe..17472a62d7 100644 --- a/docs/format/index.md +++ b/docs/format/index.md @@ -92,7 +92,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ │ ref(l_i.name) │ │ │ │ │ #labels │ ├────────────────────────────────────────────┤ ... │ │ -│ │ │ │ ref(l_i.value) │ │ │ +│ │ │ │ ref(l_i.value) │ │ │ │ │ │ └────────────────────────────────────────────┘ │ │ │ ├──────────────────┼────────────────────────────────────────────────────┤ │ │ │ │ ┌────────────────────────────────────────────┐ │ │ @@ -102,7 +102,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ │ │ ├────────────────────────────────────────────┤ │ │ │ │ │ │ ref(c_0.data) │ │ │ │ │ #chunks │ └────────────────────────────────────────────┘ │ │ -│ │ │ ┌────────────────────────────────────────────┐ │ │ +│ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ │ c_i.mint - c_i-1.maxt │ │ │ │ │ │ ├────────────────────────────────────────────┤ │ │ │ │ │ │ c_i.maxt - c_i.mint │ │ │ From 0649dfddf0448f2280043d9461f3adab56290698 Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Wed, 6 Feb 2019 00:05:30 +0800 Subject: [PATCH 17/24] more clear format representation Signed-off-by: naivewong <867245430@qq.com> --- docs/format/index.md | 60 +++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/docs/format/index.md b/docs/format/index.md index 17472a62d7..c97b123bd1 100644 --- a/docs/format/index.md +++ b/docs/format/index.md @@ -85,34 +85,38 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc `mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one. ``` -┌───────────────────────────────────────────────────────────────────────────┐ -│ len │ -├───────────────────────────────────────────────────────────────────────────┤ -│ ┌──────────────────┬────────────────────────────────────────────────────┐ │ -│ │ │ ┌────────────────────────────────────────────┐ │ │ -│ │ │ │ ref(l_i.name) │ │ │ -│ │ #labels │ ├────────────────────────────────────────────┤ ... │ │ -│ │ │ │ ref(l_i.value) │ │ │ -│ │ │ └────────────────────────────────────────────┘ │ │ -│ ├──────────────────┼────────────────────────────────────────────────────┤ │ -│ │ │ ┌────────────────────────────────────────────┐ │ │ -│ │ │ │ c_0.mint │ │ │ -│ │ │ ├────────────────────────────────────────────┤ │ │ -│ │ │ │ c_0.maxt - c_0.mint │ │ │ -│ │ │ ├────────────────────────────────────────────┤ │ │ -│ │ │ │ ref(c_0.data) │ │ │ -│ │ #chunks │ └────────────────────────────────────────────┘ │ │ -│ │ │ ┌────────────────────────────────────────────┐ │ │ -│ │ │ │ c_i.mint - c_i-1.maxt │ │ │ -│ │ │ ├────────────────────────────────────────────┤ │ │ -│ │ │ │ c_i.maxt - c_i.mint │ │ │ -│ │ │ ├────────────────────────────────────────────┤ ... │ │ -│ │ │ │ ref(c_i.data) - ref(c_i-1.data) │ │ │ -│ │ │ └────────────────────────────────────────────┘ │ │ -│ └──────────────────┴────────────────────────────────────────────────────┘ │ -├───────────────────────────────────────────────────────────────────────────┤ -│ CRC32 <4b> │ -└───────────────────────────────────────────────────────────────────────────┘ +┌──────────────────────────────────────────────────────────────────────────┐ +│ len │ +├──────────────────────────────────────────────────────────────────────────┤ +│ ┌──────────────────────────────────────────────────────────────────────┐ │ +│ │ #labels │ │ +│ ├──────────────────────────────────────────────────────────────────────┤ │ +│ │ ┌────────────────────────────────────────────┐ │ │ +│ │ │ ref(l_i.name) │ │ │ +│ │ ├────────────────────────────────────────────┤ ... │ │ +│ │ │ ref(l_i.value) │ │ │ +│ │ └────────────────────────────────────────────┘ │ │ +│ ├──────────────────────────────────────────────────────────────────────┤ │ +│ │ #chunks │ │ +│ ├──────────────────────────────────────────────────────────────────────┤ │ +│ │ ┌────────────────────────────────────────────┐ │ │ +│ │ │ c_0.mint │ │ │ +│ │ ├────────────────────────────────────────────┤ │ │ +│ │ │ c_0.maxt - c_0.mint │ │ │ +│ │ ├────────────────────────────────────────────┤ │ │ +│ │ │ ref(c_0.data) │ │ │ +│ │ └────────────────────────────────────────────┘ │ │ +│ │ ┌────────────────────────────────────────────┐ │ │ +│ │ │ c_i.mint - c_i-1.maxt │ │ │ +│ │ ├────────────────────────────────────────────┤ │ │ +│ │ │ c_i.maxt - c_i.mint │ │ │ +│ │ ├────────────────────────────────────────────┤ ... │ │ +│ │ │ ref(c_i.data) - ref(c_i-1.data) │ │ │ +│ │ └────────────────────────────────────────────┘ │ │ +│ └──────────────────────────────────────────────────────────────────────┘ │ +├──────────────────────────────────────────────────────────────────────────┤ +│ CRC32 <4b> │ +└──────────────────────────────────────────────────────────────────────────┘ ``` From 42ee59689b8d8a44cf79886fe01d5b94ab96ceb5 Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Wed, 6 Feb 2019 00:34:15 +0800 Subject: [PATCH 18/24] move dots to the bottom Signed-off-by: naivewong <867245430@qq.com> --- docs/format/index.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/format/index.md b/docs/format/index.md index c97b123bd1..4998d2b65c 100644 --- a/docs/format/index.md +++ b/docs/format/index.md @@ -93,9 +93,10 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ ref(l_i.name) │ │ │ -│ │ ├────────────────────────────────────────────┤ ... │ │ +│ │ ├────────────────────────────────────────────┤ │ │ │ │ │ ref(l_i.value) │ │ │ │ │ └────────────────────────────────────────────┘ │ │ +│ │ ... │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ #chunks │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ @@ -110,9 +111,10 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ │ │ c_i.mint - c_i-1.maxt │ │ │ │ │ ├────────────────────────────────────────────┤ │ │ │ │ │ c_i.maxt - c_i.mint │ │ │ -│ │ ├────────────────────────────────────────────┤ ... │ │ +│ │ ├────────────────────────────────────────────┤ │ │ │ │ │ ref(c_i.data) - ref(c_i-1.data) │ │ │ │ │ └────────────────────────────────────────────┘ │ │ +│ │ ... │ │ │ └──────────────────────────────────────────────────────────────────────┘ │ ├──────────────────────────────────────────────────────────────────────────┤ │ CRC32 <4b> │ From 8e7e2041d39e91989e23d245045172b357dfa98b Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Wed, 6 Feb 2019 00:48:25 +0800 Subject: [PATCH 19/24] update #labels count, #chunks count Signed-off-by: naivewong <867245430@qq.com> --- docs/format/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/format/index.md b/docs/format/index.md index 4998d2b65c..5bcacbdaa6 100644 --- a/docs/format/index.md +++ b/docs/format/index.md @@ -89,7 +89,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ len │ ├──────────────────────────────────────────────────────────────────────────┤ │ ┌──────────────────────────────────────────────────────────────────────┐ │ -│ │ #labels │ │ +│ │ #labels count │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ ref(l_i.name) │ │ │ @@ -98,7 +98,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ │ └────────────────────────────────────────────┘ │ │ │ │ ... │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ -│ │ #chunks │ │ +│ │ #chunks count │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ c_0.mint │ │ │ From 9b227d27e75a70266d72c71107892e55ab68b247 Mon Sep 17 00:00:00 2001 From: naivewong <867245430@qq.com> Date: Wed, 6 Feb 2019 10:00:18 +0800 Subject: [PATCH 20/24] update labels count, chunks count Signed-off-by: naivewong <867245430@qq.com> --- docs/format/index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/format/index.md b/docs/format/index.md index 5bcacbdaa6..1ec3c21b44 100644 --- a/docs/format/index.md +++ b/docs/format/index.md @@ -89,7 +89,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ len │ ├──────────────────────────────────────────────────────────────────────────┤ │ ┌──────────────────────────────────────────────────────────────────────┐ │ -│ │ #labels count │ │ +│ │ labels count │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ ref(l_i.name) │ │ │ @@ -98,7 +98,7 @@ After the labels, the number of indexed chunks is encoded, followed by a sequenc │ │ └────────────────────────────────────────────┘ │ │ │ │ ... │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ -│ │ #chunks count │ │ +│ │ chunks count │ │ │ ├──────────────────────────────────────────────────────────────────────┤ │ │ │ ┌────────────────────────────────────────────┐ │ │ │ │ │ c_0.mint │ │ │ From 272fd0eabf147c89e158431a472109311ca37e9f Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 7 Feb 2019 15:06:10 +0100 Subject: [PATCH 21/24] Update Makefile.common Signed-off-by: Simon Pasquier --- Makefile.common | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Makefile.common b/Makefile.common index fff85f9226..a422e1b69a 100644 --- a/Makefile.common +++ b/Makefile.common @@ -36,7 +36,8 @@ GO_VERSION ?= $(shell $(GO) version) GO_VERSION_NUMBER ?= $(word 3, $(GO_VERSION)) PRE_GO_111 ?= $(shell echo $(GO_VERSION_NUMBER) | grep -E 'go1\.(10|[0-9])\.') -unexport GOVENDOR +GOVENDOR := +GO111MODULE := ifeq (, $(PRE_GO_111)) ifneq (,$(wildcard go.mod)) # Enforce Go modules support just in case the directory is inside GOPATH (and for Travis CI). @@ -57,8 +58,6 @@ $(warning Some recipes may not work as expected as the current Go runtime is '$( # This repository isn't using Go modules (yet). GOVENDOR := $(FIRST_GOPATH)/bin/govendor endif - - unexport GO111MODULE endif PROMU := $(FIRST_GOPATH)/bin/promu STATICCHECK := $(FIRST_GOPATH)/bin/staticcheck From d48606827c06abdf320bcee6a3df79b3708979b1 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 8 Feb 2019 13:35:32 +0200 Subject: [PATCH 22/24] simplify closers Signed-off-by: Krasi Georgiev --- block.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/block.go b/block.go index 26853ea304..08b4861f80 100644 --- a/block.go +++ b/block.go @@ -16,6 +16,7 @@ package tsdb import ( "encoding/json" + "io" "io/ioutil" "os" "path/filepath" @@ -273,6 +274,14 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er if logger == nil { logger = log.NewNopLogger() } + var closers []io.Closer + defer func() { + if err != nil { + for _, c := range closers { + c.Close() + } + } + }() meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -282,31 +291,19 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er if err != nil { return nil, err } - defer func() { - if err != nil { - cr.Close() - } - }() + closers = append(closers, cr) + ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) if err != nil { return nil, err } - - defer func() { - if err != nil { - ir.Close() - } - }() + closers = append(closers, ir) tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } - defer func() { - if err != nil { - tr.Close() - } - }() + closers = append(closers, tr) // TODO refactor to set this at block creation time as // that would be the logical place for a block size to be calculated. From 07df4fd38339d27ae5a99290d98256d71e3a322a Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 11 Feb 2019 11:25:57 +0200 Subject: [PATCH 23/24] nits Signed-off-by: Krasi Georgiev --- compact_test.go | 5 ++--- db_test.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/compact_test.go b/compact_test.go index c22babe61a..d92efab1cd 100644 --- a/compact_test.go +++ b/compact_test.go @@ -746,7 +746,7 @@ func TestDisableAutoCompactions(t *testing.T) { testutil.Assert(t, len(db.Blocks()) > 0, "No block was persisted after the set timeout.") } -// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload imidiately after a compaction +// TestDeleteCompactionBlockAfterFailedReload ensures that a failed reload immediately after a compaction // deletes the resulting block to avoid creatings blocks with the same time range. func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { @@ -796,7 +796,7 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { expBlocks := bootStrap(db) - // Create a block that will trigger the reloard to fail. + // Create a block that will trigger the reload to fail. blockPath := createBlock(t, db.Dir(), genSeries(1, 1, 200, 300)) lastBlockIndex := path.Join(blockPath, indexFilename) actBlocks, err := blockDirs(db.Dir()) @@ -810,7 +810,6 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) { // Do the compaction and check the metrics. // Compaction should succeed, but the reload should fail and // the new block created from the compaction should be deleted. - db.EnableCompactions() testutil.NotOk(t, db.compact()) testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.reloadsFailed), "'failed db reload' count metrics mismatch") testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.ran), "`compaction` count metric mismatch") diff --git a/db_test.go b/db_test.go index 996ef3eb56..1b5cb35086 100644 --- a/db_test.go +++ b/db_test.go @@ -452,7 +452,7 @@ func TestDB_Snapshot(t *testing.T) { querier, err := db.Querier(mint, mint+1000) testutil.Ok(t, err) - defer func() { querier.Close() }() + defer func() { testutil.Ok(t, querier.Close()) }() // sum values seriesSet, err := querier.Select(labels.NewEqualMatcher("foo", "bar")) From bf2239079db8e7597a72cb1817714fdd099738dc Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 11 Feb 2019 11:57:46 +0200 Subject: [PATCH 24/24] refactor multi errors Signed-off-by: Krasi Georgiev --- block.go | 7 ++++--- chunks/chunks.go | 4 ++-- compact.go | 9 +++++++-- db.go | 2 +- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/block.go b/block.go index 08b4861f80..002ed1bebb 100644 --- a/block.go +++ b/block.go @@ -277,9 +277,10 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er var closers []io.Closer defer func() { if err != nil { - for _, c := range closers { - c.Close() - } + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() } }() meta, err := readMetaFile(dir) diff --git a/chunks/chunks.go b/chunks/chunks.go index 3f65bfa6aa..f0f5ac776e 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -342,7 +342,7 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { } func (s *Reader) Close() error { - return closeAll(s.cs...) + return closeAll(s.cs) } // Size returns the size of the chunks. @@ -410,7 +410,7 @@ func sequenceFiles(dir string) ([]string, error) { return res, nil } -func closeAll(cs ...io.Closer) (err error) { +func closeAll(cs []io.Closer) (err error) { for _, c := range cs { if e := c.Close(); e != nil { err = e diff --git a/compact.go b/compact.go index 4025b12d74..265cda0b84 100644 --- a/compact.go +++ b/compact.go @@ -582,7 +582,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { +func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block from no readers") } @@ -592,7 +592,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols = make(map[string]struct{}, 1<<16) closers = []io.Closer{} ) - defer func() { closeAll(closers...) }() + defer func() { + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() + }() for i, b := range blocks { indexr, err := b.Index() diff --git a/db.go b/db.go index 3bce5b08ba..e8fd9c0aab 100644 --- a/db.go +++ b/db.go @@ -1097,7 +1097,7 @@ func (es MultiError) Err() error { return es } -func closeAll(cs ...io.Closer) error { +func closeAll(cs []io.Closer) error { var merr MultiError for _, c := range cs {