diff --git a/index b/index deleted file mode 160000 index be0456338d..0000000000 --- a/index +++ /dev/null @@ -1 +0,0 @@ -Subproject commit be0456338d4d41ce559ba8371c3c6943df49c863 diff --git a/index/LICENSE b/index/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/index/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/index/cmd/index/.gitignore b/index/cmd/index/.gitignore new file mode 100644 index 0000000000..cc4be6ea23 --- /dev/null +++ b/index/cmd/index/.gitignore @@ -0,0 +1,3 @@ +index +benchout +testdata* diff --git a/index/cmd/index/Makefile b/index/cmd/index/Makefile new file mode 100644 index 0000000000..cc2156b3fc --- /dev/null +++ b/index/cmd/index/Makefile @@ -0,0 +1,14 @@ +all: bench svg + +bench: build + @echo ">> running benchmark" + @./tindex bench write testdata + +build: + @go build . + +svg: + @echo ">> create svgs" + @go tool pprof -svg ./tindex benchout/cpu.prof > benchout/cpuprof.svg + @go tool pprof -svg ./tindex benchout/mem.prof > benchout/memprof.svg + @go tool pprof -svg ./tindex benchout/block.prof > benchout/blockprof.svg diff --git a/index/cmd/index/main.go b/index/cmd/index/main.go new file mode 100644 index 0000000000..698a3eb938 --- /dev/null +++ b/index/cmd/index/main.go @@ -0,0 +1,253 @@ +package main + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "runtime/pprof" + "time" + + "github.com/fabxc/tsdb/index" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/spf13/cobra" +) + +func main() { + root := &cobra.Command{ + Use: "index", + Short: "CLI tool for index", + } + + root.AddCommand( + NewBenchCommand(), + ) + + root.Execute() +} + +func NewBenchCommand() *cobra.Command { + c := &cobra.Command{ + Use: "bench", + Short: "run benchmarks", + } + c.AddCommand(NewBenchWriteCommand()) + + return c +} + +type writeBenchmark struct { + outPath string + cleanup bool + + cpuprof *os.File + memprof *os.File + blockprof *os.File +} + +func NewBenchWriteCommand() *cobra.Command { + var wb writeBenchmark + c := &cobra.Command{ + Use: "write ", + Short: "run a write performance benchmark", + Run: wb.run, + } + c.PersistentFlags().StringVar(&wb.outPath, "out", "benchout/", "set the output path") + + return c +} + +func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { + if len(args) != 1 { + exitWithError(fmt.Errorf("missing file argument")) + } + if b.outPath == "" { + dir, err := ioutil.TempDir("", "index_bench") + if err != nil { + exitWithError(err) + } + b.outPath = dir + b.cleanup = true + } + if err := os.RemoveAll(b.outPath); err != nil { + exitWithError(err) + } + if err := os.MkdirAll(b.outPath, 0777); err != nil { + exitWithError(err) + } + + var docs []*InsertDoc + + measureTime("readData", func() { + f, err := os.Open(args[0]) + if err != nil { + exitWithError(err) + } + defer f.Close() + + docs, err = readPrometheusLabels(f) + if err != nil { + exitWithError(err) + } + }) + + dir := filepath.Join(b.outPath, "ix") + + ix, err := index.Open(dir, nil) + if err != nil { + exitWithError(err) + } + defer func() { + ix.Close() + reportSize(dir) + if b.cleanup { + os.RemoveAll(b.outPath) + } + }() + + measureTime("indexData", func() { + b.startProfiling() + indexDocs(ix, docs, 100000) + indexDocs(ix, docs, 100000) + indexDocs(ix, docs, 100000) + indexDocs(ix, docs, 100000) + b.stopProfiling() + }) +} + +func (b *writeBenchmark) startProfiling() { + var err error + + // Start CPU profiling. + b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof")) + if err != nil { + exitWithError(fmt.Errorf("bench: could not create cpu profile: %v\n", err)) + } + pprof.StartCPUProfile(b.cpuprof) + + // Start memory profiling. + b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) + if err != nil { + exitWithError(fmt.Errorf("bench: could not create memory profile: %v\n", err)) + } + runtime.MemProfileRate = 4096 + + // Start fatal profiling. + b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof")) + if err != nil { + exitWithError(fmt.Errorf("bench: could not create block profile: %v\n", err)) + } + runtime.SetBlockProfileRate(1) +} + +func (b *writeBenchmark) stopProfiling() { + if b.cpuprof != nil { + pprof.StopCPUProfile() + b.cpuprof.Close() + b.cpuprof = nil + } + if b.memprof != nil { + pprof.Lookup("heap").WriteTo(b.memprof, 0) + b.memprof.Close() + b.memprof = nil + } + if b.blockprof != nil { + pprof.Lookup("block").WriteTo(b.blockprof, 0) + b.blockprof.Close() + b.blockprof = nil + runtime.SetBlockProfileRate(0) + } +} + +func indexDocs(ix *index.Index, docs []*InsertDoc, batchSize int) { + remDocs := docs[:] + var ids []index.DocID + + for len(remDocs) > 0 { + n := batchSize + if n > len(remDocs) { + n = len(remDocs) + } + + b, err := ix.Batch() + if err != nil { + exitWithError(err) + } + for _, d := range remDocs[:n] { + id := b.Add(d.Terms) + ids = append(ids, id) + } + if err := b.Commit(); err != nil { + exitWithError(err) + } + + remDocs = remDocs[n:] + } +} + +func reportSize(dir string) { + err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil || path == dir { + return err + } + fmt.Printf(" > file=%s size=%.03fGiB\n", path[len(dir):], float64(info.Size())/1024/1024/1024) + return nil + }) + if err != nil { + exitWithError(err) + } +} + +func measureTime(stage string, f func()) { + fmt.Printf(">> start stage=%s\n", stage) + start := time.Now() + f() + fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start)) +} + +type InsertDoc struct { + Terms index.Terms +} + +func readPrometheusLabels(r io.Reader) ([]*InsertDoc, error) { + dec := expfmt.NewDecoder(r, expfmt.FmtProtoText) + + var docs []*InsertDoc + var mf dto.MetricFamily + + for { + if err := dec.Decode(&mf); err != nil { + if err == io.EOF { + break + } + return nil, err + } + + for _, m := range mf.GetMetric() { + d := &InsertDoc{ + Terms: make(index.Terms, len(m.GetLabel())+1), + } + d.Terms[0] = index.Term{ + Field: "__name__", + Val: mf.GetName(), + } + for i, l := range m.GetLabel() { + d.Terms[i+1] = index.Term{ + Field: l.GetName(), + Val: l.GetValue(), + } + } + docs = append(docs, d) + } + } + + return docs, nil +} + +func exitWithError(err error) { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) +} diff --git a/index/coding.go b/index/coding.go new file mode 100644 index 0000000000..fb8ec4da73 --- /dev/null +++ b/index/coding.go @@ -0,0 +1,206 @@ +package index + +import ( + "encoding/binary" + "errors" + "io" + "sync" + + "github.com/boltdb/bolt" +) + +var encpool buffers + +type buffers struct { + pool sync.Pool +} + +func (b *buffers) get(l int) []byte { + x := b.pool.Get() + if x == nil { + return make([]byte, l) + } + buf := x.([]byte) + if cap(buf) < l { + return make([]byte, l) + } + return buf[:l] +} + +func (b *buffers) getZero(l int) []byte { + buf := b.get(l) + for i := range buf { + buf[i] = 0 + } + return buf +} + +func (b *buffers) put(buf []byte) { + b.pool.Put(buf) +} + +func (b *buffers) bucketPut(bkt *bolt.Bucket, k, v []byte) error { + err := bkt.Put(k, v) + b.put(k) + return err +} + +func (b *buffers) bucketGet(bkt *bolt.Bucket, k []byte) []byte { + v := bkt.Get(k) + b.put(k) + return v +} + +func (b *buffers) uint64be(x uint64) []byte { + buf := b.get(8) + binary.BigEndian.PutUint64(buf, x) + return buf +} + +func (b *buffers) uvarint(x uint64) []byte { + buf := b.get(binary.MaxVarintLen64) + return buf[:binary.PutUvarint(buf, x)] +} + +type txbuffs struct { + buffers *buffers + done [][]byte +} + +func (b *txbuffs) get(l int) []byte { + buf := b.buffers.get(l) + b.done = append(b.done, buf) + return buf +} + +func (b *txbuffs) getZero(l int) []byte { + buf := b.buffers.getZero(l) + b.done = append(b.done, buf) + return buf +} + +func (b *txbuffs) release() { + for _, buf := range b.done { + b.buffers.put(buf) + } +} + +func (b *txbuffs) put(buf []byte) { + b.done = append(b.done, buf) +} + +func (b *txbuffs) uint64be(x uint64) []byte { + buf := b.get(8) + binary.BigEndian.PutUint64(buf, x) + return buf +} + +func (b *txbuffs) uvarint(x uint64) []byte { + buf := b.get(binary.MaxVarintLen64) + return buf[:binary.PutUvarint(buf, x)] +} + +// reuse of buffers +var pagePool sync.Pool + +// getBuf returns a buffer from the pool. The length of the returned slice is l. +func getPage(l int) []byte { + x := pagePool.Get() + if x == nil { + return make([]byte, l) + } + buf := x.([]byte) + if cap(buf) < l { + return make([]byte, l) + } + return buf[:l] +} + +// putBuf returns a buffer to the pool. +func putPage(buf []byte) { + pagePool.Put(buf) +} + +// bufPool is a pool for staging buffers. Using a pool allows concurrency-safe +// reuse of buffers +var bufPool sync.Pool + +// getBuf returns a buffer from the pool. The length of the returned slice is l. +func getBuf(l int) []byte { + x := bufPool.Get() + if x == nil { + return make([]byte, l) + } + buf := x.([]byte) + if cap(buf) < l { + return make([]byte, l) + } + return buf[:l] +} + +// putBuf returns a buffer to the pool. +func putBuf(buf []byte) { + bufPool.Put(buf) +} + +func encodeUint64(x uint64) []byte { + buf := getBuf(8) + binary.BigEndian.PutUint64(buf, x) + return buf +} + +func decodeUint64(buf []byte) uint64 { + return binary.BigEndian.Uint64(buf) +} + +func writeUvarint(w io.ByteWriter, x uint64) (i int, err error) { + for x >= 0x80 { + if err = w.WriteByte(byte(x) | 0x80); err != nil { + return i, err + } + x >>= 7 + i++ + } + if err = w.WriteByte(byte(x)); err != nil { + return i, err + } + return i + 1, err +} + +func writeVarint(w io.ByteWriter, x int64) (i int, err error) { + ux := uint64(x) << 1 + if x < 0 { + ux = ^ux + } + return writeUvarint(w, ux) +} + +func readUvarint(r io.ByteReader) (uint64, int, error) { + var ( + x uint64 + s uint + ) + for i := 0; ; i++ { + b, err := r.ReadByte() + if err != nil { + return x, i, err + } + if b < 0x80 { + if i > 9 || i == 9 && b > 1 { + return x, i + 1, errors.New("varint overflows a 64-bit integer") + } + return x | uint64(b)<> 1) + if ux&1 != 0 { + x = ^x + } + return x, n, err +} diff --git a/index/index.go b/index/index.go new file mode 100644 index 0000000000..034cd557ee --- /dev/null +++ b/index/index.go @@ -0,0 +1,716 @@ +package index + +import ( + "bytes" + "encoding/binary" + "encoding/gob" + "errors" + "fmt" + "io" + "math" + "os" + "path/filepath" + "regexp" + "sync" + + "github.com/boltdb/bolt" + "github.com/fabxc/tsdb/pages" +) + +var ( + errOutOfOrder = errors.New("out of order") + errNotFound = errors.New("not found") +) + +// Options for an Index. +type Options struct { +} + +// DefaultOptions used for opening a new index. +var DefaultOptions = &Options{} + +// Index is a fully persistent inverted index of documents with any number of fields +// that map to exactly one term. +type Index struct { + pbuf *pages.DB + bolt *bolt.DB + meta *meta + + rwlock sync.Mutex +} + +// Open returns an index located in the given path. If none exists a new +// one is created. +func Open(path string, opts *Options) (*Index, error) { + if opts == nil { + opts = DefaultOptions + } + + if err := os.MkdirAll(path, 0777); err != nil { + return nil, err + } + + bdb, err := bolt.Open(filepath.Join(path, "kv"), 0666, nil) + if err != nil { + return nil, err + } + pdb, err := pages.Open(filepath.Join(path, "pb"), 0666, &pages.Options{ + PageSize: pageSize, + }) + if err != nil { + return nil, err + } + ix := &Index{ + bolt: bdb, + pbuf: pdb, + meta: &meta{}, + } + if err := ix.bolt.Update(ix.init); err != nil { + return nil, err + } + return ix, nil +} + +// Close closes the index. +func (ix *Index) Close() error { + err0 := ix.pbuf.Close() + err1 := ix.bolt.Close() + if err0 != nil { + return err0 + } + return err1 +} + +var ( + bktMeta = []byte("meta") + bktDocs = []byte("docs") + bktTerms = []byte("terms") + bktTermIDs = []byte("term_ids") + bktSkiplist = []byte("skiplist") + + keyMeta = []byte("meta") +) + +func (ix *Index) init(tx *bolt.Tx) error { + // Ensure all buckets exist. Any other index methods assume + // that these buckets exist and may panic otherwise. + for _, bn := range [][]byte{ + bktMeta, bktTerms, bktTermIDs, bktDocs, bktSkiplist, + } { + if _, err := tx.CreateBucketIfNotExists(bn); err != nil { + return fmt.Errorf("create bucket %q failed: %s", string(bn), err) + } + } + + // Read the meta state if the index was already initialized. + mbkt := tx.Bucket(bktMeta) + if v := mbkt.Get(keyMeta); v != nil { + if err := ix.meta.read(v); err != nil { + return fmt.Errorf("decoding meta failed: %s", err) + } + } else { + // Index not initialized yet, set up meta information. + ix.meta = &meta{ + LastDocID: 0, + LastTermID: 0, + } + v, err := ix.meta.bytes() + if err != nil { + return fmt.Errorf("encoding meta failed: %s", err) + } + if err := mbkt.Put(keyMeta, v); err != nil { + return fmt.Errorf("creating meta failed: %s", err) + } + } + + return nil +} + +// Querier starts a new query session against the index. +func (ix *Index) Querier() (*Querier, error) { + kvtx, err := ix.bolt.Begin(false) + if err != nil { + return nil, err + } + pbtx, err := ix.pbuf.Begin(false) + if err != nil { + kvtx.Rollback() + return nil, err + } + return &Querier{ + kvtx: kvtx, + pbtx: pbtx, + // TODO(fabxc): consider getting these buckets lazily. + termBkt: kvtx.Bucket(bktTerms), + termidBkt: kvtx.Bucket(bktTermIDs), + docBkt: kvtx.Bucket(bktDocs), + skiplistBkt: kvtx.Bucket(bktSkiplist), + }, nil +} + +// Querier encapsulates the index for several queries. +type Querier struct { + kvtx *bolt.Tx + pbtx *pages.Tx + + termBkt *bolt.Bucket + termidBkt *bolt.Bucket + docBkt *bolt.Bucket + skiplistBkt *bolt.Bucket +} + +// Close closes the underlying index transactions. +func (q *Querier) Close() error { + err0 := q.pbtx.Rollback() + err1 := q.kvtx.Rollback() + if err0 != nil { + return err0 + } + return err1 +} + +// Terms returns all terms for the key field matching the provided matcher. +// If the matcher is nil, all terms for the field are returned. +func (q *Querier) Terms(key string, m Matcher) []string { + if m == nil { + m = AnyMatcher + } + return q.termsForMatcher(key, m) +} + +// Search returns an iterator over all document IDs that match all +// provided matchers. +func (q *Querier) Search(key string, m Matcher) (Iterator, error) { + tids := q.termIDsForMatcher(key, m) + its := make([]Iterator, 0, len(tids)) + + for _, t := range tids { + it, err := q.postingsIter(t) + if err != nil { + return nil, err + } + its = append(its, it) + } + + if len(its) == 0 { + return nil, nil + } + return Merge(its...), nil +} + +// postingsIter returns an iterator over the postings list of term t. +func (q *Querier) postingsIter(t termid) (Iterator, error) { + b := q.skiplistBkt.Bucket(t.bytes()) + if b == nil { + return nil, errNotFound + } + + it := &skippingIterator{ + skiplist: &boltSkiplistCursor{ + k: uint64(t), + c: b.Cursor(), + bkt: b, + }, + iterators: iteratorStoreFunc(func(k uint64) (Iterator, error) { + data, err := q.pbtx.Get(k) + if err != nil { + return nil, errNotFound + } + // TODO(fabxc): for now, offset is zero, pages have no header + // and are always delta encoded. + return newPageDelta(data).cursor(), nil + }), + } + + return it, nil +} + +func (q *Querier) termsForMatcher(key string, m Matcher) []string { + c := q.termBkt.Cursor() + pref := append([]byte(key), 0xff) + + var terms []string + // TODO(fabxc): We scan the entire term value range for the field. Improvide this by direct + // and prefixed seeks depending on the matcher. + for k, _ := c.Seek(pref); bytes.HasPrefix(k, pref); k, _ = c.Next() { + if m.Match(string(k[len(pref):])) { + terms = append(terms, string(k[len(pref):])) + } + } + return terms +} + +func (q *Querier) termIDsForMatcher(key string, m Matcher) termids { + c := q.termBkt.Cursor() + pref := append([]byte(key), 0xff) + + var ids termids + // TODO(fabxc): We scan the entire term value range for the field. Improvide this by direct + // and prefixed seeks depending on the matcher. + for k, v := c.Seek(pref); bytes.HasPrefix(k, pref); k, v = c.Next() { + if m.Match(string(k[len(pref):])) { + ids = append(ids, newTermID(v)) + } + } + return ids +} + +// Doc returns the document with the given ID. +func (q *Querier) Doc(id DocID) (Terms, error) { + v := q.docBkt.Get(id.bytes()) + if v == nil { + return nil, errNotFound + } + tids := newTermIDs(v) + + // TODO(fabxc): consider at least a per-session cache for these. + terms := make(Terms, len(tids)) + for i, t := range tids { + // TODO(fabxc): is this encode/decode cycle here worth the space savings? + // If we stored plain uint64s we can just pass the slice back in. + v := q.termidBkt.Get(t.bytes()) + if v == nil { + return nil, fmt.Errorf("term not found") + } + term, err := newTerm(v) + if err != nil { + return nil, err + } + terms[i] = term + } + return terms, nil +} + +// Delete removes all documents in the iterator from the index. +// It returns the number of deleted documents. +func (ix *Index) Delete(it Iterator) (int, error) { + panic("not implemented") +} + +// Batch starts a new batch against the index. +func (ix *Index) Batch() (*Batch, error) { + // Lock writes so we can safely pre-allocate term and doc IDs. + ix.rwlock.Lock() + + tx, err := ix.bolt.Begin(false) + if err != nil { + return nil, err + } + b := &Batch{ + ix: ix, + tx: tx, + meta: &meta{}, + termBkt: tx.Bucket(bktTerms), + termidBkt: tx.Bucket(bktTermIDs), + terms: map[Term]*batchTerm{}, + } + *b.meta = *ix.meta + return b, nil +} + +// meta contains information about the state of the index. +type meta struct { + LastDocID DocID + LastTermID termid +} + +// read initilizes the meta from a byte slice. +func (m *meta) read(b []byte) error { + return gob.NewDecoder(bytes.NewReader(b)).Decode(m) +} + +// bytes returns a byte slice representation of the meta. +func (m *meta) bytes() ([]byte, error) { + var buf bytes.Buffer + if err := gob.NewEncoder(&buf).Encode(m); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// Terms is a sortable list of terms. +type Terms []Term + +func (t Terms) Len() int { return len(t) } +func (t Terms) Swap(i, j int) { t[i], t[j] = t[j], t[i] } + +func (t Terms) Less(i, j int) bool { + if t[i].Field < t[j].Field { + return true + } + if t[i].Field > t[j].Field { + return false + } + return t[i].Val < t[j].Val +} + +// Term is a term for the specified field. +type Term struct { + Field, Val string +} + +func newTerm(b []byte) (t Term, e error) { + c := bytes.SplitN(b, []byte{0xff}, 2) + if len(c) != 2 { + return t, fmt.Errorf("invalid term") + } + t.Field = string(c[0]) + t.Val = string(c[1]) + return t, nil +} + +// bytes returns a byte slice representation of the term. +func (t *Term) bytes() []byte { + b := make([]byte, 0, len(t.Field)+1+len(t.Val)) + b = append(b, []byte(t.Field)...) + b = append(b, 0xff) + return append(b, []byte(t.Val)...) +} + +// Matcher checks whether a value for a key satisfies a check condition. +type Matcher interface { + Match(value string) bool +} + +// AnyMatcher matches any term value for a field. +var AnyMatcher = anyMatcher{} + +type anyMatcher struct{} + +func (anyMatcher) Match(_ string) bool { + return true +} + +// EqualMatcher matches exactly one value for a particular label. +type EqualMatcher struct { + val string +} + +func NewEqualMatcher(val string) *EqualMatcher { + return &EqualMatcher{val: val} +} + +func (m *EqualMatcher) Match(s string) bool { return m.val == s } + +// RegexpMatcher matches labels for the fixed key for which the value +// matches a regular expression. +type RegexpMatcher struct { + re *regexp.Regexp +} + +func NewRegexpMatcher(expr string) (*RegexpMatcher, error) { + re, err := regexp.Compile(expr) + if err != nil { + return nil, err + } + return &RegexpMatcher{re: re}, nil +} + +func (m *RegexpMatcher) Match(s string) bool { return m.re.MatchString(s) } + +// DocID is a unique identifier for a document. +type DocID uint64 + +func newDocID(b []byte) DocID { + return DocID(decodeUint64(b)) +} + +func (d DocID) bytes() []byte { + return encodeUint64(uint64(d)) +} + +type termid uint64 + +func newTermID(b []byte) termid { + return termid(decodeUint64(b)) +} + +func (t termid) bytes() []byte { + return encodeUint64(uint64(t)) +} + +type termids []termid + +func (t termids) Len() int { return len(t) } +func (t termids) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t termids) Less(i, j int) bool { return t[i] < t[j] } + +// newTermIDs reads a sequence of uvarints from b and appends them +// to the term IDs. +func newTermIDs(b []byte) (t termids) { + for len(b) > 0 { + k, n := binary.Uvarint(b) + t = append(t, termid(k)) + b = b[n:] + } + return t +} + +// bytes encodes the term IDs as a sequence of uvarints. +func (t termids) bytes() []byte { + b := make([]byte, len(t)*binary.MaxVarintLen64) + n := 0 + for _, x := range t { + n += binary.PutUvarint(b[n:], uint64(x)) + } + return b[:n] +} + +// Batch collects multiple indexing actions and allows to apply them +// to the persistet index all at once for improved performance. +type Batch struct { + ix *Index + tx *bolt.Tx + meta *meta + + termBkt *bolt.Bucket + termidBkt *bolt.Bucket + + docs []*batchDoc + terms map[Term]*batchTerm +} + +type batchDoc struct { + id DocID + terms termids +} + +type batchTerm struct { + id termid // zero if term has not been added yet + docs []DocID // documents to be indexed for the term +} + +// Add adds a new document with the given terms to the index and +// returns a new unique ID for it. +// The ID only becomes valid after the batch has been committed successfully. +func (b *Batch) Add(terms Terms) DocID { + b.meta.LastDocID++ + id := b.meta.LastDocID + tids := make(termids, 0, len(terms)) + + // Subtract last document ID before this batch was started. + for _, t := range terms { + tids = append(tids, b.addTerm(id, t)) + } + + b.docs = append(b.docs, &batchDoc{id: id, terms: tids}) + return id +} + +// SecondaryIndex indexes the document ID for additional terms. The temrs +// are not stored as part of the document's forward index as the initial terms. +// The caller has to ensure that the document IDs are added to terms in +// increasing order. +func (b *Batch) SecondaryIndex(id DocID, terms ...Term) { + for _, t := range terms { + b.addTerm(id, t) + } +} + +// addTerm adds the document ID to the term's postings list and returns +// the Term's ID. +func (b *Batch) addTerm(id DocID, t Term) termid { + tb := b.terms[t] + // Populate term if necessary and allocate a new ID if it + // hasn't been created in the database before. + if tb == nil { + tb = &batchTerm{docs: make([]DocID, 0, 1024)} + b.terms[t] = tb + + if idb := b.termBkt.Get(t.bytes()); idb != nil { + tb.id = termid(decodeUint64(idb)) + } else { + b.meta.LastTermID++ + tb.id = b.meta.LastTermID + } + } + tb.docs = append(tb.docs, id) + return tb.id +} + +// Commit executes the batched indexing against the underlying index. +func (b *Batch) Commit() error { + defer b.ix.rwlock.Unlock() + // Close read transaction to open a write transaction. The outer rwlock + // stil guards against intermittend writes between switching. + if err := b.tx.Rollback(); err != nil { + return err + } + err := b.ix.bolt.Update(func(tx *bolt.Tx) error { + docsBkt := tx.Bucket(bktDocs) + // Add document IDs to forward index, + for _, d := range b.docs { + if err := docsBkt.Put(d.id.bytes(), d.terms.bytes()); err != nil { + return err + } + } + // Add newly allocated terms. + termBkt := tx.Bucket(bktTerms) + termidBkt := tx.Bucket(bktTermIDs) + + for t, tb := range b.terms { + if tb.id > b.ix.meta.LastTermID { + bid := encodeUint64(uint64(tb.id)) + tby := t.bytes() + + if err := termBkt.Put(tby, bid); err != nil { + return fmt.Errorf("setting term failed: %s", err) + } + if err := termidBkt.Put(bid, tby); err != nil { + return fmt.Errorf("setting term failed: %s", err) + } + } + } + + pbtx, err := b.ix.pbuf.Begin(true) + if err != nil { + return err + } + if err := b.writePostingsBatch(tx, pbtx); err != nil { + pbtx.Rollback() + return err + } + if err := pbtx.Commit(); err != nil { + return err + } + return b.updateMeta(tx) + }) + return err +} + +// Rollback drops all changes applied in the batch. +func (b *Batch) Rollback() error { + b.ix.rwlock.Unlock() + return b.tx.Rollback() +} + +// writePostings adds the postings batch to the index. +func (b *Batch) writePostingsBatch(kvtx *bolt.Tx, pbtx *pages.Tx) error { + skiplist := kvtx.Bucket(bktSkiplist) + + // createPage allocates a new delta-encoded page starting with id as its first entry. + createPage := func(id DocID) (page, error) { + pg := newPageDelta(make([]byte, pageSize-pages.PageHeaderSize)) + if err := pg.init(id); err != nil { + return nil, err + } + return pg, nil + } + + for _, tb := range b.terms { + ids := tb.docs + + b, err := skiplist.CreateBucketIfNotExists(tb.id.bytes()) + if err != nil { + return err + } + sl := &boltSkiplistCursor{ + k: uint64(tb.id), + c: b.Cursor(), + bkt: b, + } + + var ( + pg page // Page we are currently appending to. + pc pageCursor // Its cursor. + pid uint64 // Its ID. + ) + // Get the most recent page. If none exist, the entire postings list is new. + _, pid, err = sl.seek(math.MaxUint64) + if err != nil { + if err != io.EOF { + return err + } + // No most recent page for the key exists. The postings list is new and + // we have to allocate a new page ID for it. + if pg, err = createPage(ids[0]); err != nil { + return err + } + pc = pg.cursor() + ids = ids[1:] + } else { + // Load the most recent page. + pdata, err := pbtx.Get(pid) + if pdata == nil { + return fmt.Errorf("error getting page for ID %q: %s", pid, err) + } + + pdatac := make([]byte, len(pdata)) + // The byte slice is mmaped from bolt. We have to copy it to make modifications. + // pdatac := make([]byte, len(pdata)) + copy(pdatac, pdata) + + pg = newPageDelta(pdatac) + pc = pg.cursor() + } + + for i := 0; i < len(ids); i++ { + if err = pc.append(ids[i]); err == errPageFull { + // We couldn't append to the page because it was full. + // Store away the old page... + if pid == 0 { + // The page was new. + pid, err = pbtx.Add(pg.data()) + if err != nil { + return err + } + first, err := pc.Seek(0) + if err != nil { + return err + } + if err := sl.append(first, pid); err != nil { + return err + } + } else { + if err = pbtx.Set(pid, pg.data()); err != nil { + return err + } + } + + // ... and allocate a new page. + pid = 0 + if pg, err = createPage(ids[i]); err != nil { + return err + } + pc = pg.cursor() + } else if err != nil { + return err + } + } + // Save the last page we have written to. + if pid == 0 { + // The page was new. + pid, err = pbtx.Add(pg.data()) + if err != nil { + return err + } + first, err := pc.Seek(0) + if err != nil { + return err + } + if err := sl.append(first, pid); err != nil { + return err + } + } else { + if err = pbtx.Set(pid, pg.data()); err != nil { + return err + } + } + } + return nil +} + +// updateMeta updates the index's meta information based on the changes +// applied with the batch. +func (b *Batch) updateMeta(tx *bolt.Tx) error { + b.ix.meta = b.meta + bkt := tx.Bucket([]byte(bktMeta)) + if bkt == nil { + return fmt.Errorf("meta bucket not found") + } + v, err := b.ix.meta.bytes() + if err != nil { + return fmt.Errorf("error encoding meta: %s", err) + } + return bkt.Put([]byte(keyMeta), v) +} diff --git a/index/index_test.go b/index/index_test.go new file mode 100644 index 0000000000..60a0228e1e --- /dev/null +++ b/index/index_test.go @@ -0,0 +1 @@ +package index diff --git a/index/iter.go b/index/iter.go new file mode 100644 index 0000000000..cf44f36780 --- /dev/null +++ b/index/iter.go @@ -0,0 +1,294 @@ +package index + +import ( + "io" + "sort" +) + +// An Iterator provides sorted iteration over a list of uint64s. +type Iterator interface { + // Next retrieves the next document ID in the postings list. + Next() (DocID, error) + // Seek moves the cursor to ID or the closest following one, if it doesn't exist. + // It returns the ID at the position. + Seek(id DocID) (DocID, error) +} + +type mergeIterator struct { + i1, i2 Iterator + v1, v2 DocID + e1, e2 error +} + +func (it *mergeIterator) Next() (DocID, error) { + if it.e1 == io.EOF && it.e2 == io.EOF { + return 0, io.EOF + } + if it.e1 != nil { + if it.e1 != io.EOF { + return 0, it.e1 + } + x := it.v2 + it.v2, it.e2 = it.i2.Next() + return x, nil + } + if it.e2 != nil { + if it.e2 != io.EOF { + return 0, it.e2 + } + x := it.v1 + it.v1, it.e1 = it.i1.Next() + return x, nil + } + if it.v1 < it.v2 { + x := it.v1 + it.v1, it.e1 = it.i1.Next() + return x, nil + } else if it.v2 < it.v1 { + x := it.v2 + it.v2, it.e2 = it.i2.Next() + return x, nil + } else { + x := it.v1 + it.v1, it.e1 = it.i1.Next() + it.v2, it.e2 = it.i2.Next() + return x, nil + } +} + +func (it *mergeIterator) Seek(id DocID) (DocID, error) { + // We just have to advance the first iterator. The next common match is also + // the next seeked ID of the intersection. + it.v1, it.e1 = it.i1.Seek(id) + it.v2, it.e2 = it.i2.Seek(id) + return it.Next() +} + +// Merge returns a new Iterator over the union of the input iterators. +func Merge(its ...Iterator) Iterator { + if len(its) == 0 { + return nil + } + i1 := its[0] + + for _, i2 := range its[1:] { + i1 = &mergeIterator{i1: i1, i2: i2} + } + return i1 +} + +// ExpandIterator walks through the iterator and returns the result list. +// The iterator is closed after completion. +func ExpandIterator(it Iterator) ([]DocID, error) { + var ( + res = []DocID{} + v DocID + err error + ) + for v, err = it.Seek(0); err == nil; v, err = it.Next() { + res = append(res, v) + } + if err == io.EOF { + return res, nil + } + return res, err +} + +type intersectIterator struct { + i1, i2 Iterator + v1, v2 DocID + e1, e2 error +} + +// Intersect returns a new Iterator over the intersection of the input iterators. +func Intersect(its ...Iterator) Iterator { + if len(its) == 0 { + return nil + } + i1 := its[0] + + for _, i2 := range its[1:] { + i1 = &intersectIterator{i1: i1, i2: i2} + } + return i1 +} + +func (it *intersectIterator) Next() (DocID, error) { + for { + if it.e1 != nil { + return 0, it.e1 + } + if it.e2 != nil { + return 0, it.e2 + } + if it.v1 < it.v2 { + it.v1, it.e1 = it.i1.Seek(it.v2) + } else if it.v2 < it.v1 { + it.v2, it.e2 = it.i2.Seek(it.v1) + } else { + v := it.v1 + it.v1, it.e1 = it.i1.Next() + it.v2, it.e2 = it.i2.Next() + return v, nil + } + } +} + +func (it *intersectIterator) Seek(id DocID) (DocID, error) { + // We have to advance both iterators. Otherwise, we get a false-positive + // match on 0 if only on of the iterators has it. + it.v1, it.e1 = it.i1.Seek(id) + it.v2, it.e2 = it.i2.Seek(id) + return it.Next() +} + +// A skiplist iterator iterates through a list of value/pointer pairs. +type skiplistIterator interface { + // seek returns the value and pointer at or before v. + seek(v DocID) (val DocID, ptr uint64, err error) + // next returns the next value/pointer pair. + next() (val DocID, ptr uint64, err error) +} + +// iteratorStore allows to retrieve an iterator based on a key. +type iteratorStore interface { + get(uint64) (Iterator, error) +} + +// skippingIterator implements the iterator interface based on skiplist, which +// allows to jump to the iterator closest to the seeked value. +// +// This iterator allows for speed up in seeks if the underlying data cannot +// be searched in O(log n). +// Ideally, the skiplist is seekable in O(log n). +type skippingIterator struct { + skiplist skiplistIterator + iterators iteratorStore + + // The iterator holding the next value. + cur Iterator +} + +// Seek implements the Iterator interface. +func (it *skippingIterator) Seek(id DocID) (DocID, error) { + _, ptr, err := it.skiplist.seek(id) + if err != nil { + return 0, err + } + cur, err := it.iterators.get(ptr) + if err != nil { + return 0, err + } + it.cur = cur + + return it.cur.Seek(id) +} + +// Next implements the Iterator interface. +func (it *skippingIterator) Next() (DocID, error) { + // If next was called initially. + // TODO(fabxc): should this just panic and initial call to seek() be required? + if it.cur == nil { + return it.Seek(0) + } + + if id, err := it.cur.Next(); err == nil { + return id, nil + } else if err != io.EOF { + return 0, err + } + // We reached the end of the current iterator. Get the next iterator through + // our skiplist. + _, ptr, err := it.skiplist.next() + if err != nil { + // Here we return the actual io.EOF if we reached the end of the iterator + // retrieved from the last skiplist entry. + return 0, err + } + // Iterate over the next iterator. + cur, err := it.iterators.get(ptr) + if err != nil { + return 0, err + } + it.cur = cur + + // Return the first value in the new iterator. + return it.cur.Seek(0) +} + +// plainListIterator implements the iterator interface on a sorted list of integers. +type plainListIterator struct { + list list + pos int +} + +func newPlainListIterator(l []DocID) *plainListIterator { + it := &plainListIterator{list: list(l)} + sort.Sort(it.list) + return it +} + +func (it *plainListIterator) Seek(id DocID) (DocID, error) { + it.pos = sort.Search(it.list.Len(), func(i int) bool { return it.list[i] >= id }) + return it.Next() + +} + +func (it *plainListIterator) Next() (DocID, error) { + if it.pos >= it.list.Len() { + return 0, io.EOF + } + x := it.list[it.pos] + it.pos++ + return x, nil +} + +type list []DocID + +func (l list) Len() int { return len(l) } +func (l list) Less(i, j int) bool { return l[i] < l[j] } +func (l list) Swap(i, j int) { l[i], l[j] = l[j], l[i] } + +// plainSkiplistIterator implements the skiplistIterator interface on plain +// in-memory mapping. +type plainSkiplistIterator struct { + m map[DocID]uint64 + keys list + pos int +} + +func newPlainSkiplistIterator(m map[DocID]uint64) *plainSkiplistIterator { + var keys list + for k := range m { + keys = append(keys, k) + } + sort.Sort(keys) + + return &plainSkiplistIterator{ + m: m, + keys: keys, + } +} + +// seek implements the skiplistIterator interface. +func (it *plainSkiplistIterator) seek(id DocID) (DocID, uint64, error) { + pos := sort.Search(len(it.keys), func(i int) bool { return it.keys[i] >= id }) + // The skiplist iterator points to the element at or before the last value. + if pos > 0 && it.keys[pos] > id { + it.pos = pos - 1 + } else { + it.pos = pos + } + return it.next() + +} + +// next implements the skiplistIterator interface. +func (it *plainSkiplistIterator) next() (DocID, uint64, error) { + if it.pos >= len(it.keys) { + return 0, 0, io.EOF + } + k := it.keys[it.pos] + it.pos++ + return k, it.m[k], nil +} diff --git a/index/iter_test.go b/index/iter_test.go new file mode 100644 index 0000000000..763db74922 --- /dev/null +++ b/index/iter_test.go @@ -0,0 +1,228 @@ +package index + +import ( + "reflect" + "testing" +) + +func TestMultiIntersect(t *testing.T) { + var cases = []struct { + a, b, c []DocID + res []DocID + }{ + { + a: []DocID{1, 2, 3, 4, 5, 6, 1000, 1001}, + b: []DocID{2, 4, 5, 6, 7, 8, 999, 1001}, + c: []DocID{1, 2, 5, 6, 7, 8, 1001, 1200}, + res: []DocID{2, 5, 6, 1001}, + }, + } + + for _, c := range cases { + i1 := newPlainListIterator(c.a) + i2 := newPlainListIterator(c.b) + i3 := newPlainListIterator(c.c) + + res, err := ExpandIterator(Intersect(i1, i2, i3)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if !reflect.DeepEqual(res, c.res) { + t.Fatalf("Expected %v but got %v", c.res, res) + } + } +} + +func TestIntersectIterator(t *testing.T) { + var cases = []struct { + a, b []DocID + res []DocID + }{ + { + a: []DocID{1, 2, 3, 4, 5}, + b: []DocID{6, 7, 8, 9, 10}, + res: []DocID{}, + }, + { + a: []DocID{1, 2, 3, 4, 5}, + b: []DocID{4, 5, 6, 7, 8}, + res: []DocID{4, 5}, + }, + { + a: []DocID{1, 2, 3, 4, 9, 10}, + b: []DocID{1, 4, 5, 6, 7, 8, 10, 11}, + res: []DocID{1, 4, 10}, + }, { + a: []DocID{1}, + b: []DocID{0, 1}, + res: []DocID{1}, + }, + } + + for _, c := range cases { + i1 := newPlainListIterator(c.a) + i2 := newPlainListIterator(c.b) + + res, err := ExpandIterator(&intersectIterator{i1: i1, i2: i2}) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if !reflect.DeepEqual(res, c.res) { + t.Fatalf("Expected %v but got %v", c.res, res) + } + } +} + +func TestMergeIntersect(t *testing.T) { + var cases = []struct { + a, b, c []DocID + res []DocID + }{ + { + a: []DocID{1, 2, 3, 4, 5, 6, 1000, 1001}, + b: []DocID{2, 4, 5, 6, 7, 8, 999, 1001}, + c: []DocID{1, 2, 5, 6, 7, 8, 1001, 1200}, + res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, + }, + } + + for _, c := range cases { + i1 := newPlainListIterator(c.a) + i2 := newPlainListIterator(c.b) + i3 := newPlainListIterator(c.c) + + res, err := ExpandIterator(Merge(i1, i2, i3)) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if !reflect.DeepEqual(res, c.res) { + t.Fatalf("Expected %v but got %v", c.res, res) + } + } +} + +func BenchmarkIntersect(t *testing.B) { + var a, b, c, d []DocID + + for i := 0; i < 10000000; i += 2 { + a = append(a, DocID(i)) + } + for i := 5000000; i < 5000100; i += 4 { + b = append(b, DocID(i)) + } + for i := 5090000; i < 5090600; i += 4 { + b = append(b, DocID(i)) + } + for i := 4990000; i < 5100000; i++ { + c = append(c, DocID(i)) + } + for i := 4000000; i < 6000000; i++ { + d = append(d, DocID(i)) + } + + i1 := newPlainListIterator(a) + i2 := newPlainListIterator(b) + i3 := newPlainListIterator(c) + i4 := newPlainListIterator(d) + + t.ResetTimer() + + for i := 0; i < t.N; i++ { + if _, err := ExpandIterator(Intersect(i1, i2, i3, i4)); err != nil { + t.Fatal(err) + } + } +} + +func TestMergeIterator(t *testing.T) { + var cases = []struct { + a, b []DocID + res []DocID + }{ + { + a: []DocID{1, 2, 3, 4, 5}, + b: []DocID{6, 7, 8, 9, 10}, + res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + { + a: []DocID{1, 2, 3, 4, 5}, + b: []DocID{4, 5, 6, 7, 8}, + res: []DocID{1, 2, 3, 4, 5, 6, 7, 8}, + }, + { + a: []DocID{1, 2, 3, 4, 9, 10}, + b: []DocID{1, 4, 5, 6, 7, 8, 10, 11}, + res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, + }, + } + + for _, c := range cases { + i1 := newPlainListIterator(c.a) + i2 := newPlainListIterator(c.b) + + res, err := ExpandIterator(&mergeIterator{i1: i1, i2: i2}) + if err != nil { + t.Fatalf("Unexpected error: %s", err) + } + if !reflect.DeepEqual(res, c.res) { + t.Fatalf("Expected %v but got %v", c.res, res) + } + } +} + +func TestSkippingIterator(t *testing.T) { + var cases = []struct { + skiplist skiplistIterator + its iteratorStore + res []DocID + }{ + { + skiplist: newPlainSkiplistIterator(map[DocID]uint64{ + 5: 3, + 50: 2, + 500: 1, + }), + its: testIteratorStore{ + 3: newPlainListIterator(list{5, 7, 8, 9}), + 2: newPlainListIterator(list{54, 60, 61}), + 1: newPlainListIterator(list{1200, 1300, 100000}), + }, + res: []DocID{5, 7, 8, 9, 54, 60, 61, 1200, 1300, 100000}, + }, + { + skiplist: newPlainSkiplistIterator(map[DocID]uint64{ + 0: 3, + 50: 2, + }), + its: testIteratorStore{ + 3: newPlainListIterator(list{5, 7, 8, 9}), + 2: newPlainListIterator(list{54, 60, 61}), + }, + res: []DocID{5, 7, 8, 9, 54, 60, 61}, + }, + } + + for _, c := range cases { + it := &skippingIterator{ + skiplist: c.skiplist, + iterators: c.its, + } + res, err := ExpandIterator(it) + if err != nil { + t.Fatalf("Unexpected error", err) + } + if !reflect.DeepEqual(res, c.res) { + t.Fatalf("Expected %v but got %v", c.res, res) + } + } +} + +type testIteratorStore map[uint64]Iterator + +func (s testIteratorStore) get(id uint64) (Iterator, error) { + it, ok := s[id] + if !ok { + return nil, errNotFound + } + return it, nil +} diff --git a/index/page.go b/index/page.go new file mode 100644 index 0000000000..868d7ba148 --- /dev/null +++ b/index/page.go @@ -0,0 +1,108 @@ +package index + +import ( + "encoding/binary" + "errors" + "io" +) + +const pageSize = 2048 + +var errPageFull = errors.New("page full") + +type pageCursor interface { + Iterator + append(v DocID) error +} + +type page interface { + cursor() pageCursor + init(v DocID) error + data() []byte +} + +type pageDelta struct { + b []byte +} + +type pageType uint8 + +const ( + pageTypeDelta pageType = iota +) + +func newPageDelta(data []byte) *pageDelta { + return &pageDelta{b: data} +} + +func (p *pageDelta) init(v DocID) error { + // Write first value. + binary.PutUvarint(p.b, uint64(v)) + return nil +} + +func (p *pageDelta) cursor() pageCursor { + return &pageDeltaCursor{data: p.b} +} + +func (p *pageDelta) data() []byte { + return p.b +} + +type pageDeltaCursor struct { + data []byte + pos int + cur DocID +} + +func (p *pageDeltaCursor) append(id DocID) error { + // Run to the end. + _, err := p.Next() + for ; err == nil; _, err = p.Next() { + // Consume. + } + if err != io.EOF { + return err + } + if len(p.data)-p.pos < binary.MaxVarintLen64 { + return errPageFull + } + if p.cur >= id { + return errOutOfOrder + } + p.pos += binary.PutUvarint(p.data[p.pos:], uint64(id-p.cur)) + p.cur = id + return nil +} + +func (p *pageDeltaCursor) Close() error { + return nil +} + +func (p *pageDeltaCursor) Seek(min DocID) (v DocID, err error) { + if min < p.cur { + p.pos = 0 + } + for v, err = p.Next(); err == nil && v < min; v, err = p.Next() { + // Consume. + } + return p.cur, err +} + +func (p *pageDeltaCursor) Next() (DocID, error) { + var n int + var dv uint64 + if p.pos == 0 { + dv, n = binary.Uvarint(p.data) + p.cur = DocID(dv) + } else { + dv, n = binary.Uvarint(p.data[p.pos:]) + if n <= 0 || dv == 0 { + return 0, io.EOF + } + p.cur += DocID(dv) + } + p.pos += n + + return p.cur, nil +} diff --git a/index/page_test.go b/index/page_test.go new file mode 100644 index 0000000000..a3f17bada3 --- /dev/null +++ b/index/page_test.go @@ -0,0 +1,116 @@ +package index + +import ( + "math/rand" + "reflect" + "testing" +) + +func TestPageDelta(t *testing.T) { + var ( + vals []DocID + last DocID + ) + for i := 0; i < 10000; i++ { + vals = append(vals, last) + last += DocID(rand.Int63n(1<<9) + 1) + } + data := make([]byte, pageSize) + page := newPageDelta(data) + + if err := page.init(vals[0]); err != nil { + t.Fatal(err) + } + + var num int + pc := page.cursor() + + for _, v := range vals[1:] { + if err := pc.append(v); err != nil { + if err == errPageFull { + break + } + t.Fatal(err) + } + num++ + } + + res, err := ExpandIterator(pc) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(res, vals[:num+1]) { + t.Errorf("output did not match") + t.Errorf("expected: %v", vals[:num+1]) + t.Errorf("received: %v", res) + } +} + +func BenchmarkPageDeltaAppend(b *testing.B) { + var ( + vals []DocID + last DocID + ) + for i := 0; i < 10000; i++ { + vals = append(vals, last) + last += DocID(rand.Int63n(1<<10) + 1) + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + data := make([]byte, pageSize) + page := newPageDelta(data) + + if err := page.init(vals[0]); err != nil { + b.Fatal(err) + } + + pc := page.cursor() + + for _, v := range vals[1:] { + if err := pc.append(v); err != nil { + if err == errPageFull { + break + } + b.Fatal(err) + } + } + } +} + +func BenchmarkPageDeltaRead(b *testing.B) { + var ( + vals []DocID + last DocID + ) + for i := 0; i < 10000; i++ { + vals = append(vals, last) + last += DocID(rand.Int63n(1<<10) + 1) + } + data := make([]byte, pageSize) + page := newPageDelta(data) + + if err := page.init(vals[0]); err != nil { + b.Fatal(err) + } + + pc := page.cursor() + + for _, v := range vals[1:] { + if err := pc.append(v); err != nil { + if err == errPageFull { + break + } + b.Fatal(err) + } + } + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + if _, err := ExpandIterator(pc); err != nil { + b.Fatal(err) + } + } +} diff --git a/index/postings.go b/index/postings.go new file mode 100644 index 0000000000..05f8c91cad --- /dev/null +++ b/index/postings.go @@ -0,0 +1,72 @@ +package index + +import ( + "io" + + "github.com/boltdb/bolt" +) + +type iteratorStoreFunc func(k uint64) (Iterator, error) + +func (s iteratorStoreFunc) get(k uint64) (Iterator, error) { + return s(k) +} + +// boltSkiplistCursor implements the skiplistCurosr interface. +// +// TODO(fabxc): benchmark the overhead of a bucket per key. +// It might be more performant to have all skiplists in the same bucket. +// +// 20k keys, ~10 skiplist entries avg -> 200k keys, 1 bucket vs 20k buckets, 10 keys +// +type boltSkiplistCursor struct { + // k is currently unused. If the bucket holds entries for more than + // just a single key, it will be necessary. + k uint64 + c *bolt.Cursor + bkt *bolt.Bucket +} + +func (s *boltSkiplistCursor) next() (DocID, uint64, error) { + db, pb := s.c.Next() + if db == nil { + return 0, 0, io.EOF + } + return newDocID(db), decodeUint64(pb), nil +} + +func (s *boltSkiplistCursor) seek(k DocID) (DocID, uint64, error) { + db, pb := s.c.Seek(k.bytes()) + if db == nil { + db, pb = s.c.Last() + if db == nil { + return 0, 0, io.EOF + } + } + did, pid := newDocID(db), decodeUint64(pb) + + if did > k { + // If the found entry is behind the seeked ID, try the previous + // entry if it exists. The page it points to contains the range of k. + dbp, pbp := s.c.Prev() + if dbp != nil { + did, pid = newDocID(dbp), decodeUint64(pbp) + } else { + // We skipped before the first entry. The cursor is now out of + // state and subsequent calls to Next() will return nothing. + // Reset it to the first position. + s.c.First() + } + } + return did, pid, nil +} + +func (s *boltSkiplistCursor) append(d DocID, p uint64) error { + k, _ := s.c.Last() + + if k != nil && decodeUint64(k) >= uint64(d) { + return errOutOfOrder + } + + return s.bkt.Put(encodeUint64(uint64(d)), encodeUint64(p)) +} diff --git a/pages b/pages deleted file mode 160000 index 76be68c231..0000000000 --- a/pages +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 76be68c231de3265e6d0ee670fe312f80b86b99a diff --git a/pages/README.md b/pages/README.md new file mode 100644 index 0000000000..ecdf0fa1ca --- /dev/null +++ b/pages/README.md @@ -0,0 +1,5 @@ +# pages + +Pages stores pages of blob data. It is essentially a minimal version of +BoltDB, where the the B+ tree was removed and replaced by simply writing +page-aligned byte slices. diff --git a/pages/db.go b/pages/db.go new file mode 100644 index 0000000000..08a2662702 --- /dev/null +++ b/pages/db.go @@ -0,0 +1,782 @@ +package pages + +import ( + "errors" + "fmt" + "hash/fnv" + "math" + "os" + "runtime" + "sync" + "time" + "unsafe" +) + +// These errors can be returned when opening or calling methods on a DB. +var ( + // ErrDatabaseNotOpen is returned when a DB instance is accessed before it + // is opened or after it is closed. + ErrDatabaseNotOpen = errors.New("database not open") + + // ErrDatabaseOpen is returned when opening a database that is + // already open. + ErrDatabaseOpen = errors.New("database already open") + + // ErrInvalid is returned when both meta pages on a database are invalid. + // This typically occurs when a file is not a bolt database. + ErrInvalid = errors.New("invalid database") + + // ErrVersionMismatch is returned when the data file was created with a + // different version of Bolt. + ErrVersionMismatch = errors.New("version mismatch") + + // ErrChecksum is returned when either meta page checksum does not match. + ErrChecksum = errors.New("checksum error") + + // ErrTimeout is returned when a database cannot obtain an exclusive lock + // on the data file after the timeout passed to Open(). + ErrTimeout = errors.New("timeout") + + // ErrNotFound is returned when a user page for an ID could not be found. + ErrNotFound = errors.New("not found") + + ErrTxClosed = errors.New("transaction closed") + + ErrTxNotWritable = errors.New("transaction not writable") +) + +// Marker value that indicates that a file is a pagebuf file. +const magic uint32 = 0xAFFEAFFE + +// The data file version. +const version = 1 + +// The largest step that can be taken when remapping the mmap. +const maxMmapStep = 1 << 30 // 1GB + +// defaultPageSize of the underlying buffers is set to the OS page size. +var defaultPageSize = os.Getpagesize() + +// DB is an interface providing access to persistent byte chunks that +// are backed by memory-mapped pages. +type DB struct { + // If you want to read the entire database fast, you can set MmapFlag to + // syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead. + MmapFlags int + + // AllocSize is the amount of space allocated when the database + // needs to create new pages. This is done to amortize the cost + // of truncate() and fsync() when growing the data file. + AllocSize int + + path string // location of the pagebuf file + file *os.File // the opened file of path + opened bool + data *[maxMapSize]byte + dataref []byte // mmap'ed readonly, write throws SEGV + datasz int + filesz int // current on disk file size + pageSize int + meta0 *meta + meta1 *meta + freelist *freelist + rwtx *Tx + txs []*Tx + + pagePool sync.Pool + + rwlock sync.Mutex // Allows only one writer at a time. + metalock sync.Mutex // Protects meta page access. + mmaplock sync.RWMutex // Protects mmap access during remapping + + ops struct { + writeAt func(b []byte, off int64) (n int, err error) + } +} + +// Options defines configuration parameters with which a PageBuf is initialized. +type Options struct { + // Timeout is the amount of time to wait to obtain a file lock. + // When set to zero it will wait indefinitely. This option is only + // available on Darwin and Linux. + Timeout time.Duration + + // Sets the DB.MmapFlags flag before memory mapping the file. + MmapFlags int + + // XXX(fabxc): potentially allow setting different allocation strategies + // to fit different use cases. + + // InitialMmapSize is the initial mmap size of the database + // in bytes. + // + // If <=0, the initial map size is 0. + // If initialMmapSize is smaller than the previous database size, + // it takes no effect. + InitialMmapSize int + + // PageSize defines a custom page size used. It cannot be changed later. + // Must be a multiple of the operating system's default page size. + PageSize int +} + +// DefaultOptions specifies a set of default parameters used when a pagebuf +// is opened without explicit options. +var DefaultOptions = Options{ + // Use the OS's default page size. + PageSize: defaultPageSize, +} + +// Default values if not set in a DB instance. +const ( + DefaultAllocSize = 16 * 1024 * 1024 +) + +// Open and create a new database under the given path. +func Open(path string, mode os.FileMode, o *Options) (*DB, error) { + db := &DB{ + opened: true, + } + + // Set default options if no options are provided. + if o == nil { + o = &DefaultOptions + } + db.MmapFlags = o.MmapFlags + + db.AllocSize = DefaultAllocSize + + flag := os.O_RDWR + + // Open data file and separate sync handler for metadata writes. + db.path = path + var err error + if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil { + _ = db.close() + return nil, err + } + + // Lock file so that other processes using pagebuf in read-write mode cannot + // use the underlying data at the same time. + if err := flock(db, mode, true, o.Timeout); err != nil { + _ = db.close() + return nil, err + } + + // Default values for test hooks + db.ops.writeAt = db.file.WriteAt + + // Initialize the database if it doesn't exist. + if info, err := db.file.Stat(); err != nil { + return nil, err + } else if info.Size() == 0 { + // Initialize new files with meta pages. + if err := db.init(o.PageSize); err != nil { + return nil, err + } + } else { + // Read the first meta page to determine the page size. + var buf [0x1000]byte + if _, err := db.file.ReadAt(buf[:], 0); err == nil { + m := db.pageInBuffer(buf[:], 0).meta() + if err := m.validate(); err != nil { + // We cannot verify which page sizes are used. + return nil, fmt.Errorf("cannot read page size: %s", err) + } else { + db.pageSize = int(m.pageSize) + } + } else { + return nil, fmt.Errorf("reading first meta page failed: %s", err) + } + } + + // Initialize page pool. + db.pagePool = sync.Pool{ + New: func() interface{} { + return make([]byte, db.pageSize) + }, + } + + // Memory map the data file. + if err := db.mmap(o.InitialMmapSize); err != nil { + _ = db.close() + return nil, err + } + + // Read in the freelist. + db.freelist = newFreelist() + db.freelist.read(db.page(db.meta().freelist)) + + // Mark the database as opened and return. + return db, nil +} + +func validatePageSize(psz int) error { + // Max value the content length can hold. + if defaultPageSize > math.MaxUint16 { + return fmt.Errorf("invalid page size %d", psz) + } + // Page size must be a multiple of OS page size so we stay + // page aligned. + if psz < defaultPageSize { + if defaultPageSize%psz != 0 { + return fmt.Errorf("invalid page size %d", psz) + } + } else if psz > defaultPageSize { + if psz%defaultPageSize != 0 { + return fmt.Errorf("invalid page size %d", psz) + } + } + return nil +} + +// init creates a new database file and initializes its meta pages. +func (db *DB) init(psz int) error { + if err := validatePageSize(psz); err != nil { + return err + } + // Set the page size to the OS page size. + db.pageSize = psz + + // Create two meta pages on a buffer. + buf := make([]byte, db.pageSize*4) + for i := 0; i < 2; i++ { + p := db.pageInBuffer(buf[:], pgid(i)) + p.id = pgid(i) + p.flags = pageFlagMeta + + // Initialize the meta page. + m := p.meta() + m.magic = magic + m.version = version + m.pageSize = uint32(db.pageSize) + m.freelist = 2 + m.txid = txid(i) + m.pgid = 4 // TODO(fabxc): we initialize with zero pages, what to do here? + m.checksum = m.sum64() + } + + // Write an empty freelist at page 3. + p := db.pageInBuffer(buf[:], pgid(2)) + p.id = pgid(2) + p.flags = pageFlagFreelist + p.count = 0 + + // Write the first empty page. + p = db.pageInBuffer(buf[:], pgid(3)) + p.id = pgid(3) + p.flags = pageFlagData + p.count = 0 + + // Write the buffer to our data file. + if _, err := db.ops.writeAt(buf, 0); err != nil { + return err + } + if err := fdatasync(db); err != nil { + return err + } + + return nil +} + +// Sync executes fdatasync() against the database file handle. +func (db *DB) Sync() error { return fdatasync(db) } + +// Close synchronizes and closes the memory-mapped pagebuf file. +func (db *DB) Close() error { + db.rwlock.Lock() + defer db.rwlock.Unlock() + + db.metalock.Lock() + defer db.metalock.Unlock() + + db.mmaplock.RLock() + defer db.mmaplock.RUnlock() + + return db.close() +} + +func (db *DB) close() error { + if !db.opened { + return nil + } + + db.opened = false + db.freelist = nil + db.ops.writeAt = nil + + // Close the mmap. + if err := db.munmap(); err != nil { + return err + } + + // Close file handles. + if db.file != nil { + // Close the file descriptor. + if err := db.file.Close(); err != nil { + return fmt.Errorf("db file close: %s", err) + } + db.file = nil + } + + db.path = "" + return nil +} + +// Update executes a function within the context of a read-write managed transaction. +// If no error is returned from the function then the transaction is committed. +// If an error is returned then the entire transaction is rolled back. +// Any error that is returned from the function or returned from the commit is +// returned from the Update() method. +// +// Attempting to manually commit or rollback within the function will cause a panic. +func (db *DB) Update(fn func(*Tx) error) error { + t, err := db.Begin(true) + if err != nil { + return err + } + + // Make sure the transaction rolls back in the event of a panic. + defer func() { + if t.db != nil { + t.rollback() + } + }() + + // Mark as a managed tx so that the inner function cannot manually commit. + t.managed = true + + // If an error is returned from the function then rollback and return error. + err = fn(t) + t.managed = false + if err != nil { + _ = t.Rollback() + return err + } + + return t.Commit() +} + +// View executes a function within the context of a managed read-only transaction. +// Any error that is returned from the function is returned from the View() method. +// +// Attempting to manually rollback within the function will cause a panic. +func (db *DB) View(fn func(*Tx) error) error { + t, err := db.Begin(false) + if err != nil { + return err + } + + // Make sure the transaction rolls back in the event of a panic. + defer func() { + if t.db != nil { + t.rollback() + } + }() + + // Mark as a managed tx so that the inner function cannot manually rollback. + t.managed = true + + // If an error is returned from the function then pass it through. + err = fn(t) + t.managed = false + if err != nil { + _ = t.Rollback() + return err + } + + if err := t.Rollback(); err != nil { + return err + } + + return nil +} + +// pageExists checks whether the page with the given id exists. +func (db *DB) pageExists(id pgid) bool { + // The page exists if it is not in the freelist or out of the data range. + return !db.freelist.cache[pgid(id)] && int(id+1)*db.pageSize < db.datasz +} + +// page retrieves a page reference from the mmap based on the current page size. +func (db *DB) page(id pgid) *page { + pos := id * pgid(db.pageSize) + return (*page)(unsafe.Pointer(&db.data[pos])) +} + +// pageInBuffer retrieves a page reference from a given byte array based on the current +// page size. +func (db *DB) pageInBuffer(b []byte, id pgid) *page { + pos := id * pgid(db.pageSize) + return (*page)(unsafe.Pointer(&b[pos])) +} + +// meta retrieves the current meta page reference. +func (db *DB) meta() *meta { + // We have to return the meta with the highest txid which doesn't fail + // validation. Otherwise, we can cause errors when in fact the database is + // in a consistent state. metaA is the one with the higher txid. + metaA := db.meta0 + metaB := db.meta1 + if db.meta1.txid > db.meta0.txid { + metaA = db.meta1 + metaB = db.meta0 + } + + // Use higher meta page if valid. Otherwise fallback to previous, if valid. + if err := metaA.validate(); err == nil { + return metaA + } else if err := metaB.validate(); err == nil { + return metaB + } + + // This should never be reached, because both meta1 and meta0 were validated + // on mmap() and we do fsync() on every write. + panic("pagebuf.PageBuf.meta(): invalid meta pages") +} + +// allocate returns a contiguous block of memory starting at a given page. +func (db *DB) allocate(count int) (*page, error) { + // Allocate a temporary buffer for the page. + var buf []byte + if count == 1 { + buf = db.pagePool.Get().([]byte) + } else { + buf = make([]byte, count*db.pageSize) + } + p := (*page)(unsafe.Pointer(&buf[0])) + p.overflow = uint32(count - 1) + + // Use pages from the freelist if they are available. + if p.id = db.freelist.allocate(count); p.id != 0 { + return p, nil + } + + // Resize mmap() if we're at the end. + p.id = db.rwtx.meta.pgid + var minsz = int((p.id+pgid(count))+1) * db.pageSize + if minsz >= db.datasz { + if err := db.mmap(minsz); err != nil { + return nil, fmt.Errorf("mmap allocate error: %s", err) + } + } + + // Move the page id high water mark. + db.rwtx.meta.pgid += pgid(count) + return p, nil +} + +// grow grows the size of the database to the given sz. +func (db *DB) grow(sz int) error { + // Ignore if the new size is less than available file size. + if sz <= db.filesz { + return nil + } + + // If the data is smaller than the alloc size then only allocate what's needed. + // Once it goes over the allocation size then allocate in chunks. + if db.datasz < db.AllocSize { + sz = db.datasz + } else { + sz += db.AllocSize + } + + // Truncate and fsync to ensure file size metadata is flushed. + // https://github.com/boltdb/bolt/issues/284 + if runtime.GOOS != "windows" { + if err := db.file.Truncate(int64(sz)); err != nil { + return fmt.Errorf("file resize error: %s", err) + } + } + if err := db.file.Sync(); err != nil { + return fmt.Errorf("file sync error: %s", err) + } + + db.filesz = sz + return nil +} + +// mmap opens the underlying memory-mapped file and initializes it. +// minsz is the minimum size that the mmap can be. +func (db *DB) mmap(minsz int) error { + db.mmaplock.Lock() + defer db.mmaplock.Unlock() + + info, err := db.file.Stat() + if err != nil { + return fmt.Errorf("mmap stat error: %s", err) + } else if int(info.Size()) < db.pageSize*2 { + return fmt.Errorf("file size too small") + } + + // Ensure the size is at least the minimum size. + var size = int(info.Size()) + if size < minsz { + size = minsz + } + size, err = db.mmapSize(size) + if err != nil { + return err + } + + // Unmap existing data before continuing. + if err := db.munmap(); err != nil { + return err + } + // Memory-map the data file as a byte slice. + if err := mmap(db, size); err != nil { + return err + } + + // Save references to the meta pages. + db.meta0 = db.page(0).meta() + db.meta1 = db.page(1).meta() + + // Validate the meta pages. We only return an error if both meta pages fail + // validation, since meta0 failing validation means that it wasn't saved + // properly -- but we can recover using meta1. And vice-versa. + err0 := db.meta0.validate() + err1 := db.meta1.validate() + if err0 != nil && err1 != nil { + return err0 + } + return nil +} + +// munmap unmaps the data file from memory. +func (db *DB) munmap() error { + if err := munmap(db); err != nil { + return fmt.Errorf("unmap error: %s", err) + } + return nil +} + +// mmapSize determines the appropriate size for the mmap given the current size +// of the database. The minimum size is 32KB and doubles until it reaches 1GB. +// Returns an error if the new mmap size is greater than the max allowed. +func (db *DB) mmapSize(size int) (int, error) { + // Double the size from 32KB until 1GB. + for i := uint(15); i <= 30; i++ { + if size <= 1< maxMapSize { + return 0, fmt.Errorf("mmap too large") + } + + // If larger than 1GB then grow by 1GB at a time. + sz := int64(size) + if remainder := sz % int64(maxMmapStep); remainder > 0 { + sz += int64(maxMmapStep) - remainder + } + + // Ensure that the mmap size is a multiple of the page size. + // This should always be true since we're incrementing in MBs. + pageSize := int64(db.pageSize) + if (sz % pageSize) != 0 { + sz = ((sz / pageSize) + 1) * pageSize + } + + // If we've exceeded the max size then only grow up to the max size. + if sz > maxMapSize { + sz = maxMapSize + } + + return int(sz), nil +} + +func (db *DB) String() string { + return fmt.Sprintf("PageBuf<%s>", db.path) +} + +// Path returns the path to the currently opened pagebuf file. +func (db *DB) Path() string { + return db.path +} + +// Begin starts a new transaction. +// Multiple read-only transactions can be used concurrently but only one +// write transaction can be used at a time. Starting multiple write transactions +// will cause the calls to block and be serialized until the current write +// transaction finishes. +// +// Transactions should not be dependent on one another. Opening a read +// transaction and a write transaction in the same goroutine can cause the +// writer to deadlock because the database periodically needs to re-mmap itself +// as it grows and it cannot do that while a read transaction is open. +// +// If a long running read transaction (for example, a snapshot transaction) is +// needed, you might want to set PageBuf.InitialMmapSize to a large enough value +// to avoid potential blocking of write transaction. +// +// IMPORTANT: You must close read-only transactions after you are finished or +// else the database will not reclaim old pages. +func (db *DB) Begin(writable bool) (*Tx, error) { + if writable { + return db.beginRWTx() + } + return db.beginTx() +} + +func (db *DB) beginTx() (*Tx, error) { + // Lock the meta pages while we initialize the transaction. We obtain + // the meta lock before the mmap lock because that's the order that the + // write transaction will obtain them. + db.metalock.Lock() + + // Obtain a read-only lock on the mmap. When the mmap is remapped it will + // obtain a write lock so all transactions must finish before it can be + // remapped. + db.mmaplock.RLock() + + // Exit if the database is not open yet. + if !db.opened { + db.mmaplock.RUnlock() + db.metalock.Unlock() + return nil, ErrDatabaseNotOpen + } + + // Create a transaction associated with the database. + t := &Tx{} + t.init(db) + + // Keep track of transaction until it closes. + db.txs = append(db.txs, t) + + // Unlock the meta pages. + db.metalock.Unlock() + + return t, nil +} + +func (db *DB) beginRWTx() (*Tx, error) { + // Obtain writer lock. This is released by the transaction when it closes. + // This enforces only one writer transaction at a time. + db.rwlock.Lock() + + // Once we have the writer lock then we can lock the meta pages so that + // we can set up the transaction. + db.metalock.Lock() + defer db.metalock.Unlock() + + // Exit if the database is not open yet. + if !db.opened { + db.rwlock.Unlock() + return nil, ErrDatabaseNotOpen + } + + // Create a transaction associated with the database. + t := &Tx{writable: true} + t.init(db) + db.rwtx = t + + // Free any pages associated with closed read-only transactions. + var minid txid = 0xFFFFFFFFFFFFFFFF + for _, t := range db.txs { + if t.meta.txid < minid { + minid = t.meta.txid + } + } + if minid > 0 { + db.freelist.release(minid - 1) + } + + return t, nil +} + +// removeTx removes a transaction from the database. +func (db *DB) removeTx(tx *Tx) { + // Release the read lock on the mmap. + db.mmaplock.RUnlock() + + // Use the meta lock to restrict access to the DB object. + db.metalock.Lock() + + // Remove the transaction. + for i, t := range db.txs { + if t == tx { + db.txs = append(db.txs[:i], db.txs[i+1:]...) + break + } + } + // Unlock the meta pages. + db.metalock.Unlock() +} + +// Size represents a valid page size. +type Size int8 + +// The valid sizes for allocated pages. +const ( + Size512 Size = -3 + Size1024 = -2 + Size2048 = -1 + Size4096 = 0 + Size8192 = 1 +) + +const ( + upageSizeMin = Size512 + upageSizeMax = Size8192 +) + +type meta struct { + magic uint32 + version uint32 + pageSize uint32 + flags uint32 + freelist pgid + txid txid + pgid pgid + checksum uint64 +} + +// validate checks the marker bytes and version of the meta page to ensure it matches this binary. +func (m *meta) validate() error { + if m.magic != magic { + return ErrInvalid + } else if m.version != version { + return ErrVersionMismatch + } else if m.checksum != 0 && m.checksum != m.sum64() { + return ErrChecksum + } + return nil +} + +// copy copies one meta object to another. +func (m *meta) copy(dest *meta) { + *dest = *m +} + +// write writes the meta onto a page. +func (m *meta) write(p *page) { + if m.freelist >= m.pgid { + panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid)) + } + + // Page id is either going to be 0 or 1 which we can determine by the transaction ID. + p.id = pgid(m.txid % 2) + p.flags |= pageFlagMeta + + // Calculate the checksum. + m.checksum = m.sum64() + + m.copy(p.meta()) +} + +// generates the checksum for the meta. +func (m *meta) sum64() uint64 { + var h = fnv.New64a() + _, _ = h.Write((*[unsafe.Offsetof(meta{}.checksum)]byte)(unsafe.Pointer(m))[:]) + return h.Sum64() +} + +// _assert will panic with a given formatted message if the given condition is false. +func _assert(condition bool, msg string, v ...interface{}) { + if !condition { + panic(fmt.Sprintf("assertion failed: "+msg, v...)) + } +} diff --git a/pages/freelist.go b/pages/freelist.go new file mode 100644 index 0000000000..0f87f3acdb --- /dev/null +++ b/pages/freelist.go @@ -0,0 +1,248 @@ +package pages + +import ( + "fmt" + "sort" + "unsafe" +) + +// freelist represents a list of all pages that are available for allocation. +// It also tracks pages that have been freed but are still in use by open transactions. +type freelist struct { + ids []pgid // all free and available free page ids. + pending map[txid][]pgid // mapping of soon-to-be free page ids by tx. + cache map[pgid]bool // fast lookup of all free and pending page ids. +} + +// newFreelist returns an empty, initialized freelist. +func newFreelist() *freelist { + return &freelist{ + pending: make(map[txid][]pgid), + cache: make(map[pgid]bool), + } +} + +// size returns the size of the page after serialization. +func (f *freelist) size() int { + return PageHeaderSize + (int(unsafe.Sizeof(pgid(0))) * f.count()) +} + +// count returns count of pages on the freelist +func (f *freelist) count() int { + return f.free_count() + f.pending_count() +} + +// free_count returns count of free pages +func (f *freelist) free_count() int { + return len(f.ids) +} + +// pending_count returns count of pending pages +func (f *freelist) pending_count() int { + var count int + for _, list := range f.pending { + count += len(list) + } + return count +} + +// all returns a list of all free ids and all pending ids in one sorted list. +func (f *freelist) all() []pgid { + m := make(pgids, 0) + + for _, list := range f.pending { + m = append(m, list...) + } + + sort.Sort(m) + return pgids(f.ids).merge(m) +} + +// allocate returns the starting page id of a contiguous list of pages of a given size. +// If a contiguous block cannot be found then 0 is returned. +func (f *freelist) allocate(n int) pgid { + if len(f.ids) == 0 { + return 0 + } + + var initial, previd pgid + for i, id := range f.ids { + if id <= 1 { + panic(fmt.Sprintf("invalid page allocation: %d", id)) + } + + // Reset initial page if this is not contiguous. + if previd == 0 || id-previd != 1 { + initial = id + } + + // If we found a contiguous block then remove it and return it. + if (id-initial)+1 == pgid(n) { + // If we're allocating off the beginning then take the fast path + // and just adjust the existing slice. This will use extra memory + // temporarily but the append() in free() will realloc the slice + // as is necessary. + if (i + 1) == n { + f.ids = f.ids[i+1:] + } else { + copy(f.ids[i-n+1:], f.ids[i+1:]) + f.ids = f.ids[:len(f.ids)-n] + } + + // Remove from the free cache. + for i := pgid(0); i < pgid(n); i++ { + delete(f.cache, initial+i) + } + + return initial + } + + previd = id + } + return 0 +} + +// free releases a page and its overflow for a given transaction id. +// If the page is already free then a panic will occur. +func (f *freelist) free(txid txid, p *page) { + if p.id <= 1 { + panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id)) + } + + // Free page and all its overflow pages. + var ids = f.pending[txid] + for id := p.id; id <= p.id+pgid(p.overflow); id++ { + // Verify that page is not already free. + if f.cache[id] { + panic(fmt.Sprintf("page %d already freed", id)) + } + + // Add to the freelist and cache. + ids = append(ids, id) + f.cache[id] = true + } + f.pending[txid] = ids +} + +// release moves all page ids for a transaction id (or older) to the freelist. +func (f *freelist) release(txid txid) { + m := make(pgids, 0) + for tid, ids := range f.pending { + if tid <= txid { + // Move transaction's pending pages to the available freelist. + // Don't remove from the cache since the page is still free. + m = append(m, ids...) + delete(f.pending, tid) + } + } + sort.Sort(m) + f.ids = pgids(f.ids).merge(m) +} + +// rollback removes the pages from a given pending tx. +func (f *freelist) rollback(txid txid) { + // Remove page ids from cache. + for _, id := range f.pending[txid] { + delete(f.cache, id) + } + + // Remove pages from pending list. + delete(f.pending, txid) +} + +// freed returns whether a given page is in the free list. +func (f *freelist) freed(pgid pgid) bool { + return f.cache[pgid] +} + +// read initializes the freelist from a freelist page. +func (f *freelist) read(p *page) { + // If the page.count is at the max uint16 value (64k) then it's considered + // an overflow and the size of the freelist is stored as the first element. + idx, count := 0, int(p.count) + if count == 0xFFFF { + idx = 1 + count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0]) + } + + // Copy the list of page ids from the freelist. + if count == 0 { + f.ids = nil + } else { + ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count] + f.ids = make([]pgid, len(ids)) + copy(f.ids, ids) + + // Make sure they're sorted. + sort.Sort(pgids(f.ids)) + } + + // Rebuild the page cache. + f.reindex() +} + +// write writes the page ids onto a freelist page. All free and pending ids are +// saved to disk since in the event of a program crash, all pending ids will +// become free. +func (f *freelist) write(p *page) error { + // Combine the old free pgids and pgids waiting on an open transaction. + ids := f.all() + + // Update the header flag. + p.flags |= pageFlagFreelist + + // The page.count can only hold up to 64k elements so if we overflow that + // number then we handle it by putting the size in the first element. + if len(ids) == 0 { + p.count = uint16(len(ids)) + } else if len(ids) < 0xFFFF { + p.count = uint16(len(ids)) + copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:], ids) + } else { + p.count = 0xFFFF + ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pgid(len(ids)) + copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:], ids) + } + + return nil +} + +// reload reads the freelist from a page and filters out pending items. +func (f *freelist) reload(p *page) { + f.read(p) + + // Build a cache of only pending pages. + pcache := make(map[pgid]bool) + for _, pendingIDs := range f.pending { + for _, pendingID := range pendingIDs { + pcache[pendingID] = true + } + } + + // Check each page in the freelist and build a new available freelist + // with any pages not in the pending lists. + var a []pgid + for _, id := range f.ids { + if !pcache[id] { + a = append(a, id) + } + } + f.ids = a + + // Once the available list is rebuilt then rebuild the free cache so that + // it includes the available and pending free pages. + f.reindex() +} + +// reindex rebuilds the free cache based on available and pending free lists. +func (f *freelist) reindex() { + f.cache = make(map[pgid]bool) + for _, id := range f.ids { + f.cache[id] = true + } + for _, pendingIDs := range f.pending { + for _, pendingID := range pendingIDs { + f.cache[pendingID] = true + } + } +} diff --git a/pages/freelist_test.go b/pages/freelist_test.go new file mode 100644 index 0000000000..9176397ec6 --- /dev/null +++ b/pages/freelist_test.go @@ -0,0 +1,158 @@ +package pages + +import ( + "math/rand" + "reflect" + "sort" + "testing" + "unsafe" +) + +// Ensure that a page is added to a transaction's freelist. +func TestFreelist_free(t *testing.T) { + f := newFreelist() + f.free(100, &page{id: 12}) + if !reflect.DeepEqual([]pgid{12}, f.pending[100]) { + t.Fatalf("exp=%v; got=%v", []pgid{12}, f.pending[100]) + } +} + +// Ensure that a page and its overflow is added to a transaction's freelist. +func TestFreelist_free_overflow(t *testing.T) { + f := newFreelist() + f.free(100, &page{id: 12, overflow: 3}) + if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100]) { + t.Fatalf("exp=%v; got=%v", exp, f.pending[100]) + } +} + +// Ensure that a transaction's free pages can be released. +func TestFreelist_release(t *testing.T) { + f := newFreelist() + f.free(100, &page{id: 12, overflow: 1}) + f.free(100, &page{id: 9}) + f.free(102, &page{id: 39}) + f.release(100) + f.release(101) + if exp := []pgid{9, 12, 13}; !reflect.DeepEqual(exp, f.ids) { + t.Fatalf("exp=%v; got=%v", exp, f.ids) + } + + f.release(102) + if exp := []pgid{9, 12, 13, 39}; !reflect.DeepEqual(exp, f.ids) { + t.Fatalf("exp=%v; got=%v", exp, f.ids) + } +} + +// Ensure that a freelist can find contiguous blocks of pages. +func TestFreelist_allocate(t *testing.T) { + f := &freelist{ids: []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18}} + if id := int(f.allocate(3)); id != 3 { + t.Fatalf("exp=3; got=%v", id) + } + if id := int(f.allocate(1)); id != 6 { + t.Fatalf("exp=6; got=%v", id) + } + if id := int(f.allocate(3)); id != 0 { + t.Fatalf("exp=0; got=%v", id) + } + if id := int(f.allocate(2)); id != 12 { + t.Fatalf("exp=12; got=%v", id) + } + if id := int(f.allocate(1)); id != 7 { + t.Fatalf("exp=7; got=%v", id) + } + if id := int(f.allocate(0)); id != 0 { + t.Fatalf("exp=0; got=%v", id) + } + if id := int(f.allocate(0)); id != 0 { + t.Fatalf("exp=0; got=%v", id) + } + if exp := []pgid{9, 18}; !reflect.DeepEqual(exp, f.ids) { + t.Fatalf("exp=%v; got=%v", exp, f.ids) + } + + if id := int(f.allocate(1)); id != 9 { + t.Fatalf("exp=9; got=%v", id) + } + if id := int(f.allocate(1)); id != 18 { + t.Fatalf("exp=18; got=%v", id) + } + if id := int(f.allocate(1)); id != 0 { + t.Fatalf("exp=0; got=%v", id) + } + if exp := []pgid{}; !reflect.DeepEqual(exp, f.ids) { + t.Fatalf("exp=%v; got=%v", exp, f.ids) + } +} + +// Ensure that a freelist can deserialize from a freelist page. +func TestFreelist_read(t *testing.T) { + // Create a page. + var buf [4096]byte + page := (*page)(unsafe.Pointer(&buf[0])) + page.flags = pageFlagFreelist + page.count = 2 + + // Insert 2 page ids. + ids := (*[3]pgid)(unsafe.Pointer(&page.ptr)) + ids[0] = 23 + ids[1] = 50 + + // Deserialize page into a freelist. + f := newFreelist() + f.read(page) + + // Ensure that there are two page ids in the freelist. + if exp := []pgid{23, 50}; !reflect.DeepEqual(exp, f.ids) { + t.Fatalf("exp=%v; got=%v", exp, f.ids) + } +} + +// Ensure that a freelist can serialize into a freelist page. +func TestFreelist_write(t *testing.T) { + // Create a freelist and write it to a page. + var buf [4096]byte + f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid][]pgid)} + f.pending[100] = []pgid{28, 11} + f.pending[101] = []pgid{3} + p := (*page)(unsafe.Pointer(&buf[0])) + if err := f.write(p); err != nil { + t.Fatal(err) + } + + // Read the page back out. + f2 := newFreelist() + f2.read(p) + + // Ensure that the freelist is correct. + // All pages should be present and in reverse order. + if exp := []pgid{3, 11, 12, 28, 39}; !reflect.DeepEqual(exp, f2.ids) { + t.Fatalf("exp=%v; got=%v", exp, f2.ids) + } +} + +func Benchmark_FreelistRelease10K(b *testing.B) { benchmark_FreelistRelease(b, 10000) } +func Benchmark_FreelistRelease100K(b *testing.B) { benchmark_FreelistRelease(b, 100000) } +func Benchmark_FreelistRelease1000K(b *testing.B) { benchmark_FreelistRelease(b, 1000000) } +func Benchmark_FreelistRelease10000K(b *testing.B) { benchmark_FreelistRelease(b, 10000000) } + +func benchmark_FreelistRelease(b *testing.B, size int) { + ids := randomPgids(size) + pending := randomPgids(len(ids) / 400) + b.ResetTimer() + for i := 0; i < b.N; i++ { + f := &freelist{ids: ids, pending: map[txid][]pgid{1: pending}} + f.release(1) + } +} + +func randomPgids(n int) []pgid { + rand.Seed(42) + pgids := make(pgids, n) + for i := range pgids { + pgids[i] = pgid(rand.Int63()) + } + sort.Sort(pgids) + return pgids +} diff --git a/pages/page.go b/pages/page.go new file mode 100644 index 0000000000..8c5020734a --- /dev/null +++ b/pages/page.go @@ -0,0 +1,103 @@ +package pages + +import ( + "fmt" + "os" + "sort" + "unsafe" +) + +const PageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr)) + +const ( + // pageFlag{head,tail,body}? + pageFlagMeta = 0x02 + pageFlagFreelist = 0x04 + pageFlagData = 0x08 +) + +type pgid uint64 + +type page struct { + id pgid + flags uint16 + count uint16 + overflow uint32 + ptr uintptr +} + +// typ returns a human readable page type string. +func (p *page) typ() string { + if (p.flags & pageFlagMeta) != 0 { + return "meta" + } else if (p.flags & pageFlagFreelist) != 0 { + return "freelist" + } else if (p.flags & pageFlagData) != 0 { + return "data" + } + return fmt.Sprintf("unknown<%02x>", p.flags) +} + +func (p *page) String() string { + return fmt.Sprintf("page<%s,%016x>", p.typ(), p.id) +} + +// meta returns a pointer to the metadata section of a page. +func (p *page) meta() *meta { + return (*meta)(unsafe.Pointer(&p.ptr)) +} + +// dump writes n bytes of the page to STDERR as hex output. +func (p *page) hexdump(n int) { + buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:n] + fmt.Fprintf(os.Stderr, "%x\n", buf) +} + +type pages []*page + +func (s pages) Len() int { return len(s) } +func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s pages) Less(i, j int) bool { return s[i].id < s[j].id } + +type pgids []pgid + +func (s pgids) Len() int { return len(s) } +func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s pgids) Less(i, j int) bool { return s[i] < s[j] } + +// merge returns the sorted union of a and b. +func (a pgids) merge(b pgids) pgids { + // Return the opposite slice if one is nil. + if len(a) == 0 { + return b + } else if len(b) == 0 { + return a + } + + // Create a list to hold all elements from both lists. + merged := make(pgids, 0, len(a)+len(b)) + + // Assign lead to the slice with a lower starting value, follow to the higher value. + lead, follow := a, b + if b[0] < a[0] { + lead, follow = b, a + } + + // Continue while there are elements in the lead. + for len(lead) > 0 { + // Merge largest prefix of lead that is ahead of follow[0]. + n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] }) + merged = append(merged, lead[:n]...) + if n >= len(lead) { + break + } + + // Swap lead and follow. + lead, follow = follow, lead[n:] + } + + // Append what's left in follow. + merged = append(merged, follow...) + + return merged +} diff --git a/pages/page_test.go b/pages/page_test.go new file mode 100644 index 0000000000..3d0d57342b --- /dev/null +++ b/pages/page_test.go @@ -0,0 +1,69 @@ +package pages + +import ( + "reflect" + "sort" + "testing" + "testing/quick" +) + +// Ensure that the page type can be returned in human readable format. +func TestPage_typ(t *testing.T) { + if typ := (&page{flags: pageFlagData}).typ(); typ != "data" { + t.Fatalf("exp=branch; got=%v", typ) + } + if typ := (&page{flags: pageFlagMeta}).typ(); typ != "meta" { + t.Fatalf("exp=meta; got=%v", typ) + } + if typ := (&page{flags: pageFlagFreelist}).typ(); typ != "freelist" { + t.Fatalf("exp=freelist; got=%v", typ) + } + if typ := (&page{flags: 20000}).typ(); typ != "unknown<4e20>" { + t.Fatalf("exp=unknown<4e20>; got=%v", typ) + } +} + +// Ensure that the hexdump debugging function doesn't blow up. +func TestPage_dump(t *testing.T) { + (&page{id: 256}).hexdump(16) +} + +func TestPgids_merge(t *testing.T) { + a := pgids{4, 5, 6, 10, 11, 12, 13, 27} + b := pgids{1, 3, 8, 9, 25, 30} + c := a.merge(b) + if !reflect.DeepEqual(c, pgids{1, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30}) { + t.Errorf("mismatch: %v", c) + } + + a = pgids{4, 5, 6, 10, 11, 12, 13, 27, 35, 36} + b = pgids{8, 9, 25, 30} + c = a.merge(b) + if !reflect.DeepEqual(c, pgids{4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30, 35, 36}) { + t.Errorf("mismatch: %v", c) + } +} + +func TestPgids_merge_quick(t *testing.T) { + if err := quick.Check(func(a, b pgids) bool { + // Sort incoming lists. + sort.Sort(a) + sort.Sort(b) + + // Merge the two lists together. + got := a.merge(b) + + // The expected value should be the two lists combined and sorted. + exp := append(a, b...) + sort.Sort(exp) + + if !reflect.DeepEqual(exp, got) { + t.Errorf("\nexp=%+v\ngot=%+v\n", exp, got) + return false + } + + return true + }, nil); err != nil { + t.Fatal(err) + } +} diff --git a/pages/pages_amd64.go b/pages/pages_amd64.go new file mode 100644 index 0000000000..776f8dff2d --- /dev/null +++ b/pages/pages_amd64.go @@ -0,0 +1,7 @@ +package pages + +// maxMapSize represents the largest mmap size supported by pagebuf. +const maxMapSize = 0xFFFFFFFFFFFF // 256TB + +// maxAllocSize is the size used when creating array pointers. +const maxAllocSize = 0x7FFFFFFF diff --git a/pages/pages_linux.go b/pages/pages_linux.go new file mode 100644 index 0000000000..c56a396a80 --- /dev/null +++ b/pages/pages_linux.go @@ -0,0 +1,10 @@ +package pages + +import ( + "syscall" +) + +// fdatasync flushes written data to a file descriptor. +func fdatasync(pb *Pagebuf) error { + return syscall.Fdatasync(int(pb.file.Fd())) +} diff --git a/pages/pages_unix.go b/pages/pages_unix.go new file mode 100644 index 0000000000..b413a073e9 --- /dev/null +++ b/pages/pages_unix.go @@ -0,0 +1,89 @@ +// +build !windows,!plan9,!solaris + +package pages + +import ( + "fmt" + "os" + "syscall" + "time" + "unsafe" +) + +// flock acquires an advisory lock on a file descriptor. +func flock(pb *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error { + var t time.Time + for { + // If we're beyond our timeout then return an error. + // This can only occur after we've attempted a flock once. + if t.IsZero() { + t = time.Now() + } else if timeout > 0 && time.Since(t) > timeout { + return ErrTimeout + } + flag := syscall.LOCK_SH + if exclusive { + flag = syscall.LOCK_EX + } + + // Otherwise attempt to obtain an exclusive lock. + err := syscall.Flock(int(pb.file.Fd()), flag|syscall.LOCK_NB) + if err == nil { + return nil + } else if err != syscall.EWOULDBLOCK { + return err + } + + // Wait for a bit and try again. + time.Sleep(50 * time.Millisecond) + } +} + +// funlock releases an advisory lock on a file descriptor. +func funlock(pb *DB) error { + return syscall.Flock(int(pb.file.Fd()), syscall.LOCK_UN) +} + +// mmap memory maps a PageBuf's data file. +func mmap(pb *DB, sz int) error { + // Map the data file to memory. + b, err := syscall.Mmap(int(pb.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|pb.MmapFlags) + if err != nil { + return err + } + + // Advise the kernel that the mmap is accessed randomly. + if err := madvise(b, syscall.MADV_RANDOM); err != nil { + return fmt.Errorf("madvise: %s", err) + } + + // Save the original byte slice and convert to a byte array pointer. + pb.dataref = b + pb.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0])) + pb.datasz = sz + return nil +} + +// munmap unmaps a PageBuf's data file from memory. +func munmap(pb *DB) error { + // Ignore the unmap if we have no mapped data. + if pb.dataref == nil { + return nil + } + + // Unmap using the original byte slice. + err := syscall.Munmap(pb.dataref) + pb.dataref = nil + pb.data = nil + pb.datasz = 0 + return err +} + +// NOTE: This function is copied from stdlib because it is not available on darwin. +func madvise(b []byte, advice int) (err error) { + _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice)) + if e1 != 0 { + err = e1 + } + return +} diff --git a/pages/pagessync_unix.go b/pages/pagessync_unix.go new file mode 100644 index 0000000000..43a9208c9e --- /dev/null +++ b/pages/pagessync_unix.go @@ -0,0 +1,8 @@ +// +build !windows,!plan9,!linux,!openbsd + +package pages + +// fdatasync flushes written data to a file descriptor. +func fdatasync(pb *DB) error { + return pb.file.Sync() +} diff --git a/pages/tx.go b/pages/tx.go new file mode 100644 index 0000000000..e2e9d065ce --- /dev/null +++ b/pages/tx.go @@ -0,0 +1,384 @@ +package pages + +import ( + "fmt" + "sort" + "unsafe" +) + +// txid represents the internal transaction identifier. +type txid uint64 + +// Tx represents a read-only or read/write transaction on the page buffer. +// Read-only transactions can be used for retrieving pages. +// Read/write transactions can retrieve and write pages. +// +// IMPORTANT: You must commit or rollback transactions when you are done with +// them. Pages can not be reclaimed by the writer until no more transactions +// are using them. A long running read transaction can cause the database to +// quickly grow. +type Tx struct { + writable bool + managed bool + db *DB + meta *meta + pages map[pgid]*page + delPages map[pgid]bool + + // WriteFlag specifies the flag for write-related methods like WriteTo(). + // Tx opens the database file with the specified flag to copy the data. + // + // By default, the flag is unset, which works well for mostly in-memory + // workloads. For databases that are much larger than available RAM, + // set the flag to syscall.O_DIRECT to avoid trashing the page cache. + WriteFlag int +} + +// init initializes the transaction. +func (tx *Tx) init(db *DB) { + tx.db = db + tx.pages = nil + + // Copy the meta page since it can be changed by the writer. + tx.meta = &meta{} + db.meta().copy(tx.meta) + + // Increment the transaction id and add a page cache for writable transactions. + if tx.writable { + tx.pages = make(map[pgid]*page) + tx.delPages = make(map[pgid]bool) + tx.meta.txid += txid(1) + } +} + +// ID returns the transaction id. +func (tx *Tx) ID() uint64 { + return uint64(tx.meta.txid) +} + +// Size returns current database size in bytes as seen by this transaction. +func (tx *Tx) Size() int64 { + return int64(tx.meta.pgid) * int64(tx.db.pageSize) +} + +// DB returns a reference to the database that created the transaction. +func (tx *Tx) DB() *DB { + return tx.db +} + +// Writable returns whether the transaction can perform write operations. +func (tx *Tx) Writable() bool { + return tx.writable +} + +// Rollback closes the transaction and ignores all previous updates. Read-only +// transactions must be rolled back and not committed. +func (tx *Tx) Rollback() error { + _assert(!tx.managed, "managed tx rollback not allowed") + if tx.db == nil { + return ErrTxClosed + } + tx.rollback() + return nil +} + +func (tx *Tx) rollback() { + if tx.db == nil { + return + } + if tx.writable { + tx.db.freelist.rollback(tx.meta.txid) + tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist)) + } + tx.close() +} + +func (tx *Tx) close() { + if tx.db == nil { + return + } + if tx.writable { + // Remove transaction ref & writer lock. + tx.db.rwtx = nil + tx.db.rwlock.Unlock() + } else { + tx.db.removeTx(tx) + } + + // Clear all references. + tx.db = nil + tx.meta = nil + tx.pages = nil +} + +// page returns a reference to the page with a given id. +// If page has been written to then a temporary buffered page is returned. +func (tx *Tx) page(id pgid) *page { + // Check the dirty pages first. + if tx.pages != nil { + if p, ok := tx.pages[id]; ok { + return p + } + } + + // Otherwise return directly from the mmap. + return tx.db.page(id) +} + +func (tx *Tx) pageExists(id pgid) bool { + // Check whether the page was modified during this transaction. + if tx.pages != nil { + if _, ok := tx.pages[id]; ok { + return true + } + } + // Check whether page was deleted during this transaction. + if tx.delPages != nil { + if tx.delPages[id] { + return false + } + } + // The page was not touched within this transaction. Fallthrough to + // the database's check. + return tx.db.pageExists(id) +} + +// allocate returns a contiguous block of memory starting at a given page. +func (tx *Tx) allocate(count int) (*page, error) { + p, err := tx.db.allocate(count) + if err != nil { + return nil, err + } + + // Save to our page cache. + tx.pages[p.id] = p + return p, nil +} + +// Commit writes all changes to disk and updates the meta page. +// Returns an error if a disk write error occurs, or if Commit is +// called on a read-only transaction. +func (tx *Tx) Commit() error { + _assert(!tx.managed, "managed tx commit not allowed") + if tx.db == nil { + return ErrTxClosed + } else if !tx.writable { + return ErrTxNotWritable + } + + // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. + + opgid := tx.meta.pgid + + // Free the freelist and allocate new pages for it. This will overestimate + // the size of the freelist but not underestimate the size (which would be bad). + tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist)) + p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) + if err != nil { + tx.rollback() + return err + } + if err := tx.db.freelist.write(p); err != nil { + tx.rollback() + return err + } + tx.meta.freelist = p.id + + // If the high water mark has moved up then attempt to grow the database. + if tx.meta.pgid > opgid { + if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil { + tx.rollback() + return err + } + } + + // Write dirty pages to disk. + if err := tx.write(); err != nil { + tx.rollback() + return err + } + + // Write meta to disk. + if err := tx.writeMeta(); err != nil { + tx.rollback() + return err + } + + // Finalize the transaction. + tx.close() + + return nil +} + +// write writes any dirty pages to disk. +func (tx *Tx) write() error { + // Sort pages by id. + pages := make(pages, 0, len(tx.pages)) + for _, p := range tx.pages { + pages = append(pages, p) + } + // Clear out page cache early. + tx.pages = make(map[pgid]*page) + sort.Sort(pages) + + // Write pages to disk in order. + for _, p := range pages { + size := (int(p.overflow) + 1) * tx.db.pageSize + offset := int64(p.id) * int64(tx.db.pageSize) + + // Write out page in "max allocation" sized chunks. + ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p)) + for { + // Limit our write to our max allocation size. + sz := size + if sz > maxAllocSize-1 { + sz = maxAllocSize - 1 + } + + // Write chunk to disk. + buf := ptr[:sz] + if _, err := tx.db.ops.writeAt(buf, offset); err != nil { + return err + } + + // Exit inner for loop if we've written all the chunks. + size -= sz + if size == 0 { + break + } + + // Otherwise move offset forward and move pointer to next chunk. + offset += int64(sz) + ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz])) + } + } + + if err := fdatasync(tx.db); err != nil { + return err + } + + // Put small pages back to page pool. + for _, p := range pages { + // Ignore page sizes over 1 page. + // These are allocated using make() instead of the page pool. + if int(p.overflow) != 0 { + continue + } + + buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize] + + // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1 + for i := range buf { + buf[i] = 0 + } + tx.db.pagePool.Put(buf) + } + + return nil +} + +// writeMeta writes the meta to the disk. +func (tx *Tx) writeMeta() error { + // Create a temporary buffer for the meta page. + buf := make([]byte, tx.db.pageSize) + p := tx.db.pageInBuffer(buf, 0) + tx.meta.write(p) + + // Write the meta page to file. + if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil { + return err + } + if err := fdatasync(tx.db); err != nil { + return err + } + + return nil +} + +// Get retrieves the bytes stored in the page with the given id. +// The returned byte slice is only valid for the duration of the transaction. +func (tx *Tx) Get(id uint64) ([]byte, error) { + if !tx.pageExists(pgid(id)) { + return nil, ErrNotFound + } + + p := tx.page(pgid(id)) + size := int(p.overflow)*tx.db.pageSize - PageHeaderSize + int(p.count) + b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[:size] + + return b, nil +} + +// Add creates a new page with the given content. The inserted byte slice +// will be padded at the end to fit the next largest page size. Retrieving the page +// will return the padding as well. +// Inserted data should hence have included termination markers. +func (tx *Tx) Add(c []byte) (uint64, error) { + l := len(c) + PageHeaderSize // total size required + n := 1 // number of pages required + for n*tx.db.pageSize < l { + n++ + } + if l > maxAllocSize { + return 0, fmt.Errorf("page of size %d too large", l) + } + p, err := tx.allocate(n) + if err != nil { + return 0, fmt.Errorf("page alloc error: %s", err) + } + p.flags |= pageFlagData + // count holds the length used in the last page. + p.count = uint16(l - (n-1)*tx.db.pageSize) + + b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[:] + copy(b, c) + + return uint64(p.id), nil +} + +// Del deletes the page witht he given ID. +func (tx *Tx) Del(id uint64) error { + if !tx.pageExists(pgid(id)) { + return ErrNotFound + } + + tx.db.freelist.free(tx.meta.txid, tx.db.page(pgid(id))) + return nil +} + +// Set overwrites the page with the given ID with c. +func (tx *Tx) Set(id uint64, c []byte) error { + if !tx.pageExists(pgid(id)) { + return ErrNotFound + } + p := tx.db.page(pgid(id)) + + l := len(c) + PageHeaderSize // total size required + n := int(p.overflow + 1) + // The contents must fit into the previously allocated pages. + if l > n*tx.db.pageSize { + return fmt.Errorf("invalid overwrite size") + } + + // Allocate a temporary buffer for the page. + var buf []byte + if n == 1 { + buf = tx.db.pagePool.Get().([]byte) + } else { + buf = make([]byte, n*tx.db.pageSize) + } + np := tx.db.pageInBuffer(buf, 0) + *np = *p + // count holds the length used in the last page. + np.count = uint16(l - (n-1)*tx.db.pageSize) + + // TODO(fabxc): Potential performance improvement point could be using c directly. + // Just copy it for now. + b := (*[maxAllocSize]byte)(unsafe.Pointer(&np.ptr))[:] + copy(b, c) + + tx.pages[pgid(id)] = np + // TODO(fabxc): truncate and free pages that are no longer needed. + + return nil +}