diff --git a/wal.go b/wal.go index a930252483..33793d782f 100644 --- a/wal.go +++ b/wal.go @@ -3,6 +3,7 @@ package tsdb import ( "bufio" "encoding/binary" + "hash" "hash/crc32" "io" "math" @@ -47,8 +48,9 @@ type WAL struct { flushInterval time.Duration segmentSize int64 - cur *bufio.Writer - curN int64 + crc32 hash.Hash32 + cur *bufio.Writer + curN int64 stopc chan struct{} donec chan struct{} @@ -79,6 +81,7 @@ func OpenWAL(dir string, l log.Logger, flushInterval time.Duration) (*WAL, error donec: make(chan struct{}), stopc: make(chan struct{}), segmentSize: walSegmentSizeBytes, + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } if err := w.initSegments(); err != nil { return nil, err @@ -96,7 +99,7 @@ func (w *WAL) Reader() *WALReader { for _, f := range w.files { rs = append(rs, f) } - return &WALReader{rs: rs} + return NewWALReader(rs...) } // Log writes a batch of new series labels and samples to the log. @@ -301,8 +304,8 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { } } - h := crc32.NewIEEE() - wr := io.MultiWriter(h, w.cur) + w.crc32.Reset() + wr := io.MultiWriter(w.crc32, w.cur) b := make([]byte, 6) b[0] = byte(et) @@ -316,7 +319,7 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { if _, err := wr.Write(buf); err != nil { return err } - if _, err := w.cur.Write(h.Sum(nil)); err != nil { + if _, err := w.cur.Write(w.crc32.Sum(nil)); err != nil { return err } @@ -407,9 +410,10 @@ func (w *WAL) encodeSamples(samples []refdSample) error { // WALReader decodes and emits write ahead log entries. type WALReader struct { - rs []io.ReadCloser - cur int - buf []byte + rs []io.ReadCloser + cur int + buf []byte + crc32 hash.Hash32 err error labels []labels.Labels @@ -419,8 +423,9 @@ type WALReader struct { // NewWALReader returns a new WALReader over the sequence of the given ReadClosers. func NewWALReader(rs ...io.ReadCloser) *WALReader { return &WALReader{ - rs: rs, - buf: make([]byte, 0, 1024*1024), + rs: rs, + buf: make([]byte, 0, 128*4096), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } } @@ -483,8 +488,8 @@ func (r *WALReader) Next() bool { } func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { - cw := crc32.NewIEEE() - tr := io.TeeReader(cr, cw) + r.crc32.Reset() + tr := io.TeeReader(cr, r.crc32) b := make([]byte, 6) if _, err := tr.Read(b); err != nil { @@ -513,7 +518,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { if err != nil { return 0, 0, nil, err } - if exp, has := binary.BigEndian.Uint32(b[:4]), cw.Sum32(); has != exp { + if exp, has := binary.BigEndian.Uint32(b[:4]), r.crc32.Sum32(); has != exp { return 0, 0, nil, errors.Errorf("unexpected CRC32 checksum %x, want %x", has, exp) } diff --git a/writer.go b/writer.go index a73f2bd116..86bb75274e 100644 --- a/writer.go +++ b/writer.go @@ -3,6 +3,7 @@ package tsdb import ( "bufio" "encoding/binary" + "hash" "hash/crc32" "io" "sort" @@ -42,10 +43,11 @@ type SeriesWriter interface { // seriesWriter implements the SeriesWriter interface for the standard // serialization format. type seriesWriter struct { - ow io.Writer - w *bufio.Writer - n int64 - c int + ow io.Writer + w *bufio.Writer + n int64 + c int + crc32 hash.Hash index IndexWriter } @@ -55,6 +57,7 @@ func newSeriesWriter(w io.Writer, index IndexWriter) *seriesWriter { ow: w, w: bufio.NewWriterSize(w, 1*1024*1024), n: 0, + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), index: index, } } @@ -82,9 +85,8 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM } } - // TODO(fabxc): is crc32 enough for chunks of one series? - h := crc32.NewIEEE() - wr := io.MultiWriter(h, w.w) + w.crc32.Reset() + wr := io.MultiWriter(w.crc32, w.w) // For normal reads we don't need the number of the chunk section but // it allows us to verify checksums without reading the index file. @@ -117,7 +119,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM chk.Chunk = nil } - if err := w.write(w.w, h.Sum(nil)); err != nil { + if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { return err } @@ -195,6 +197,8 @@ type indexWriter struct { symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets postings []hashEntry // postings lists offsets + + crc32 hash.Hash } func newIndexWriter(w io.Writer) *indexWriter { @@ -204,6 +208,7 @@ func newIndexWriter(w io.Writer) *indexWriter { n: 0, symbols: make(map[string]uint32, 4096), series: make(map[uint32]*indexWriterSeries, 4096), + crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } } @@ -215,8 +220,8 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error { // section writes a CRC32 checksummed section of length l and guarded by flag. func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { - h := crc32.NewIEEE() - wr := io.MultiWriter(h, w.w) + w.crc32.Reset() + wr := io.MultiWriter(w.crc32, w.w) b := [5]byte{flag, 0, 0, 0, 0} binary.BigEndian.PutUint32(b[1:], l) @@ -228,7 +233,7 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er if err := f(wr); err != nil { return errors.Wrap(err, "contents write func") } - if err := w.write(w.w, h.Sum(nil)); err != nil { + if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { return errors.Wrap(err, "writing checksum") } return nil