From 5e53f767c426c9f1cc4be033fe5aa7254099414c Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Thu, 14 Oct 2021 11:11:07 -0700 Subject: [PATCH] Use concurrent bz2 decompression (#13360) Testing with `mc sql --compression BZIP2 --csv-input "rd=\n,fh=USE,fd=;" --query="select COUNT(*) from S3Object" local2/testbucket/nyc-taxi-data-10M.csv.bz2` Before 96.98s, after 10.79s. Uses about 70% CPU while running. --- cmd/untar.go | 8 ++++++-- go.mod | 1 + go.sum | 2 ++ internal/s3select/progress.go | 24 ++++++++++++++++++++++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/cmd/untar.go b/cmd/untar.go index 6ba374e9a..b13d313ea 100644 --- a/cmd/untar.go +++ b/cmd/untar.go @@ -21,12 +21,14 @@ import ( "archive/tar" "bufio" "bytes" - "compress/bzip2" + "context" "fmt" "io" "os" "path" + "runtime" + "github.com/cosnicolaou/pbzip2" "github.com/klauspost/compress/s2" "github.com/klauspost/compress/zstd" gzip "github.com/klauspost/pgzip" @@ -112,7 +114,9 @@ func untar(r io.Reader, putObject func(reader io.Reader, info os.FileInfo, name defer dec.Close() r = dec case formatBZ2: - r = bzip2.NewReader(bf) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + r = pbzip2.NewReader(ctx, bf, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2))) case formatLZ4: r = lz4.NewReader(bf) case formatUnknown: diff --git a/go.mod b/go.mod index 6830be51b..d655cbdea 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/cheggaaa/pb v1.0.29 github.com/colinmarc/hdfs/v2 v2.2.0 github.com/coredns/coredns v1.4.0 + github.com/cosnicolaou/pbzip2 v1.0.0 github.com/dchest/siphash v1.2.1 github.com/djherbis/atime v1.0.0 github.com/dswarbrick/smart v0.0.0-20190505152634-909a45200d6d diff --git a/go.sum b/go.sum index 2771507cf..9d4c806e7 100644 --- a/go.sum +++ b/go.sum @@ -258,6 +258,8 @@ github.com/coreos/go-systemd/v22 v22.3.1/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180108230652-97fdf19511ea/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/cosnicolaou/pbzip2 v1.0.0 h1:T/807kTuUNv7hYj4eYTIdGuJ41N5EcpYX6cOMm46Bdc= +github.com/cosnicolaou/pbzip2 v1.0.0/go.mod h1:cE04zhBMvwMrCLhsx6aLYh9cGsU9GyFB0oo/GmO+SkY= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= diff --git a/internal/s3select/progress.go b/internal/s3select/progress.go index 488a83243..8f2a490e6 100644 --- a/internal/s3select/progress.go +++ b/internal/s3select/progress.go @@ -18,13 +18,15 @@ package s3select import ( - "compress/bzip2" + "context" "errors" "fmt" "io" + "runtime" "sync" "sync/atomic" + "github.com/cosnicolaou/pbzip2" "github.com/klauspost/compress/s2" "github.com/klauspost/compress/zstd" gzip "github.com/klauspost/pgzip" @@ -121,7 +123,9 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea r = gzr pr.closer = gzr case bzip2Type: - r = bzip2.NewReader(scannedReader) + ctx, cancel := context.WithCancel(context.Background()) + r = pbzip2.NewReader(ctx, scannedReader, pbzip2.DecompressionOptions(pbzip2.BZConcurrency((runtime.GOMAXPROCS(0)+1)/2))) + pr.closer = &nopReadCloser{fn: cancel} case zstdType: // Set a max window of 64MB. More than reasonable. zr, err := zstd.NewReader(scannedReader, zstd.WithDecoderConcurrency(2), zstd.WithDecoderMaxWindow(64<<20)) @@ -143,3 +147,19 @@ func newProgressReader(rc io.ReadCloser, compType CompressionType) (*progressRea return &pr, nil } + +type nopReadCloser struct { + fn func() +} + +func (n2 *nopReadCloser) Read(p []byte) (n int, err error) { + panic("should not be called") +} + +func (n2 *nopReadCloser) Close() error { + if n2.fn != nil { + n2.fn() + } + n2.fn = nil + return nil +}