diff --git a/tsdb/compact.go b/tsdb/compact.go index fc34b9bef9..3e57512df5 100644 --- a/tsdb/compact.go +++ b/tsdb/compact.go @@ -569,7 +569,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } } - indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) + indexw, err := index.NewWriter(c.ctx, filepath.Join(tmp, indexFilename)) if err != nil { return errors.Wrap(err, "open index writer") } diff --git a/tsdb/compact_test.go b/tsdb/compact_test.go index 09d75c803e..db90651099 100644 --- a/tsdb/compact_test.go +++ b/tsdb/compact_test.go @@ -956,8 +956,8 @@ func TestCancelCompactions(t *testing.T) { }() // Create some blocks to fall within the compaction range. - createBlock(t, tmpdir, genSeries(10, 10000, 0, 1000)) - createBlock(t, tmpdir, genSeries(10, 10000, 1000, 2000)) + createBlock(t, tmpdir, genSeries(1, 10000, 0, 1000)) + createBlock(t, tmpdir, genSeries(1, 10000, 1000, 2000)) createBlock(t, tmpdir, genSeries(1, 1, 2000, 2001)) // The most recent block is ignored so can be e small one. // Copy the db so we have an exact copy to compare compaction times. diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 882d00ef49..e51ff0e8d7 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -15,6 +15,7 @@ package index import ( "bufio" + "context" "encoding/binary" "hash" "hash/crc32" @@ -111,6 +112,7 @@ func newCRC32() hash.Hash32 { // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { + ctx context.Context f *os.File fbuf *bufio.Writer pos uint64 @@ -176,7 +178,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) { } // NewWriter returns a new Writer to the given filename. It serializes data in format version 2. -func NewWriter(fn string) (*Writer, error) { +func NewWriter(ctx context.Context, fn string) (*Writer, error) { dir := filepath.Dir(fn) df, err := fileutil.OpenDir(dir) @@ -198,6 +200,7 @@ func NewWriter(fn string) (*Writer, error) { } iw := &Writer{ + ctx: ctx, f: f, fbuf: bufio.NewWriterSize(f, 1<<22), pos: 0, @@ -256,6 +259,12 @@ func (w *Writer) addPadding(size int) error { // ensureStage handles transitions between write stages and ensures that IndexWriter // methods are called in an order valid for the implementation. func (w *Writer) ensureStage(s indexWriterStage) error { + select { + case <-w.ctx.Done(): + return w.ctx.Err() + default: + } + if w.stage == s { return nil } @@ -699,6 +708,11 @@ func (w *Writer) writePostings() error { } } } + select { + case <-w.ctx.Done(): + return w.ctx.Err() + default: + } } return nil diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 7cd891abc4..d60af036cf 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -14,6 +14,7 @@ package index import ( + "context" "fmt" "io/ioutil" "math/rand" @@ -155,7 +156,7 @@ func TestIndexRW_Create_Open(t *testing.T) { fn := filepath.Join(dir, indexFilename) // An empty index must still result in a readable file. - iw, err := NewWriter(fn) + iw, err := NewWriter(context.Background(), fn) testutil.Ok(t, err) testutil.Ok(t, iw.Close()) @@ -183,7 +184,7 @@ func TestIndexRW_Postings(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(fn) + iw, err := NewWriter(context.Background(), fn) testutil.Ok(t, err) series := []labels.Labels{ @@ -247,7 +248,7 @@ func TestPostingsMany(t *testing.T) { fn := filepath.Join(dir, indexFilename) - iw, err := NewWriter(fn) + iw, err := NewWriter(context.Background(), fn) testutil.Ok(t, err) // Create a label in the index which has 999 values. @@ -368,7 +369,7 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - iw, err := NewWriter(filepath.Join(dir, indexFilename)) + iw, err := NewWriter(context.Background(), filepath.Join(dir, indexFilename)) testutil.Ok(t, err) testutil.Ok(t, iw.AddSymbols(symbols))