diff --git a/cmd/object-handlers.go b/cmd/object-handlers.go index 9e9907c8b..4635a0fac 100644 --- a/cmd/object-handlers.go +++ b/cmd/object-handlers.go @@ -189,28 +189,17 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r getObjectNInfo := objectAPI.GetObjectNInfo if api.CacheAPI() != nil { getObjectNInfo = api.CacheAPI().GetObjectNInfo - } - - getObject := func(offset, length int64) (rc io.ReadCloser, err error) { - isSuffixLength := false - if offset < 0 { - isSuffixLength = true + } else { + // Take read lock on object, here so subsequent lower-level + // calls do not need to. + lock := objectAPI.NewNSLock(bucket, object) + lkctx, err := lock.GetRLock(ctx, globalOperationTimeout) + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return } - - if length > 0 { - length-- - } - - rs := &HTTPRangeSpec{ - IsSuffixLength: isSuffixLength, - Start: offset, - End: offset + length, - } - if length == -1 { - rs.End = -1 - } - - return getObjectNInfo(ctx, bucket, object, rs, r.Header, readLock, opts) + ctx = lkctx.Context() + defer lock.RUnlock(lkctx.Cancel) } objInfo, err := getObjectInfo(ctx, bucket, object, opts) @@ -241,6 +230,24 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r } } + actualSize, err := objInfo.GetActualSize() + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return + } + + objectRSC := s3select.NewObjectReadSeekCloser( + func(offset int64) (io.ReadCloser, error) { + rs := &HTTPRangeSpec{ + IsSuffixLength: false, + Start: offset, + End: -1, + } + return getObjectNInfo(ctx, bucket, object, rs, r.Header, noLock, opts) + }, + actualSize, + ) + s3Select, err := s3select.NewS3Select(r.Body) if err != nil { if serr, ok := err.(s3select.SelectError); ok { @@ -261,7 +268,7 @@ func (api objectAPIHandlers) SelectObjectContentHandler(w http.ResponseWriter, r } defer s3Select.Close() - if err = s3Select.Open(getObject); err != nil { + if err = s3Select.Open(objectRSC); err != nil { if serr, ok := err.(s3select.SelectError); ok { encodedErrorResponse := encodeResponse(APIErrorResponse{ Code: serr.ErrorCode(), @@ -4191,23 +4198,26 @@ func (api objectAPIHandlers) PostRestoreObjectHandler(w http.ResponseWriter, r * go func() { rctx := GlobalContext if !rreq.SelectParameters.IsEmpty() { - getObject := func(offset, length int64) (rc io.ReadCloser, err error) { - isSuffixLength := false - if offset < 0 { - isSuffixLength = true - } - - rs := &HTTPRangeSpec{ - IsSuffixLength: isSuffixLength, - Start: offset, - End: offset + length, - } - - return getTransitionedObjectReader(rctx, bucket, object, rs, r.Header, objInfo, ObjectOptions{ - VersionID: objInfo.VersionID, - }) + actualSize, err := objInfo.GetActualSize() + if err != nil { + writeErrorResponse(ctx, w, toAPIError(ctx, err), r.URL) + return } - if err = rreq.SelectParameters.Open(getObject); err != nil { + + objectRSC := s3select.NewObjectReadSeekCloser( + func(offset int64) (io.ReadCloser, error) { + rs := &HTTPRangeSpec{ + IsSuffixLength: false, + Start: offset, + End: -1, + } + return getTransitionedObjectReader(rctx, bucket, object, rs, r.Header, + objInfo, ObjectOptions{VersionID: objInfo.VersionID}) + }, + actualSize, + ) + + if err = rreq.SelectParameters.Open(objectRSC); err != nil { if serr, ok := err.(s3select.SelectError); ok { encodedErrorResponse := encodeResponse(APIErrorResponse{ Code: serr.ErrorCode(), diff --git a/go.mod b/go.mod index ecc68b82d..c5e859704 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.12.0 github.com/fatih/color v1.13.0 github.com/felixge/fgprof v0.9.2 + github.com/fraugster/parquet-go v0.10.0 github.com/go-ldap/ldap/v3 v3.2.4 github.com/go-openapi/loads v0.21.1 github.com/go-sql-driver/mysql v1.6.0 @@ -52,7 +53,6 @@ require ( github.com/minio/kes v0.19.0 github.com/minio/madmin-go v1.3.11 github.com/minio/minio-go/v7 v7.0.23 - github.com/minio/parquet-go v1.1.0 github.com/minio/pkg v1.1.20 github.com/minio/selfupdate v0.4.0 github.com/minio/sha256-simd v1.0.0 @@ -200,7 +200,6 @@ require ( github.com/tidwall/gjson v1.14.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.0 // indirect - github.com/tidwall/sjson v1.2.3 // indirect github.com/tklauser/go-sysconf v0.3.10 // indirect github.com/tklauser/numcpus v0.4.0 // indirect github.com/unrolled/secure v1.10.0 // indirect diff --git a/go.sum b/go.sum index e2b9ffc09..e3d414abc 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,9 @@ github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk5 github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.15.0 h1:aGvdaR0v1t9XLgjtBYwxcBvBOTMqClzwE26CHOgjW1Y= github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= +github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de/go.mod h1:DCaWoUhZrYW9p1lxo/cm8EmUOOzAPSEZNGF2DK1dJgw= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs= github.com/armon/go-metrics v0.3.3 h1:a9F4rlj7EWWrbj7BYw8J8+x+ZZkJeqzNyRk8hdPF+ro= @@ -192,8 +195,12 @@ github.com/containerd/ttrpc v0.0.0-20190828154514-0e0f228740de/go.mod h1:PvCDdDG github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc= github.com/coredns/coredns v1.9.0 h1:M1EF1uups4CYcQGb1z8A97mfoq4BYCw3+xCYcJkOSDc= github.com/coredns/coredns v1.9.0/go.mod h1:czzy6Ofs15Mzn1PXpWoplBCZxoWdGoQUInL9uPSiYME= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-oidc v2.1.0+incompatible h1:sdJrfw8akMnCuUlaZU3tE/uYXFgfqom8DBE9so9EBsM= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -202,6 +209,7 @@ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzA github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cosnicolaou/pbzip2 v1.0.1 h1:f5Ix000Rtl9tr0Ne33wNLtljGl2nAyR4ZirJrz9qg+0= github.com/cosnicolaou/pbzip2 v1.0.1/go.mod h1:cE04zhBMvwMrCLhsx6aLYh9cGsU9GyFB0oo/GmO+SkY= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -263,9 +271,11 @@ github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8 github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/frankban/quicktest v1.10.0/go.mod h1:ui7WezCLWMWxVWr1GETZY3smRy0G4KWq9vcPtJmFl7Y= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= -github.com/frankban/quicktest v1.12.1/go.mod h1:qLE0fzW0VuyUAJgPU19zByoIr0HtCHN/r/VLSOOIySU= github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss= github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og= +github.com/fraugster/parquet-go v0.10.0 h1:whX91AO3dkkOnbH9MqD53DZ3rISw+Tnnj5yiqXjSv9Q= +github.com/fraugster/parquet-go v0.10.0/go.mod h1:asQOKX0K/j+F3Xyj87kw7gKrU3yXo9M2hb8STSQKIIw= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko= github.com/gdamore/encoding v1.0.0/go.mod h1:alR0ol34c49FCSBLjhosxzcPHQbf2trDkoo5dl+VrEg= @@ -620,6 +630,7 @@ github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magefile/mage v1.10.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= @@ -683,12 +694,9 @@ github.com/minio/mc v0.0.0-20220407151251-dbc09a8bf054/go.mod h1:PIQHcb4uOctKyL/ github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw= github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= -github.com/minio/minio-go/v7 v7.0.10/go.mod h1:td4gW1ldOsj1PbSNS+WYK43j+P1XVhX/8W8awaYlBFo= github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78/go.mod h1:mTh2uJuAbEqdhMVl6CMIIZLUeiMiWtJR4JB8/5g2skw= github.com/minio/minio-go/v7 v7.0.23 h1:NleyGQvAn9VQMU+YHVrgV4CX+EPtxPt/78lHOOTncy4= github.com/minio/minio-go/v7 v7.0.23/go.mod h1:ei5JjmxwHaMrgsMrn4U/+Nmg+d8MKS1U2DAn1ou4+Do= -github.com/minio/parquet-go v1.1.0 h1:j2Fn1/h7Ts/0qzdMZU9oCUKr0IJwRTD9Hg9QJyVaN6A= -github.com/minio/parquet-go v1.1.0/go.mod h1:nnAkbt2CG/DCQ3trcV3uyvwns4VjyoINF5vMqF5efOE= github.com/minio/pkg v1.0.3/go.mod h1:obU54TZ9QlMv0TRaDgQ/JTzf11ZSXxnSfLrm4tMtBP8= github.com/minio/pkg v1.1.20 h1:NhYtoQHw/Sl1ib/lroANFwkQspE0YyTeVR1CMPEff/A= github.com/minio/pkg v1.1.20/go.mod h1:Xo7LQshlxGa9shKwJ7NzQbgW4s8T/Wc1cOStR/eUiMY= @@ -713,6 +721,7 @@ github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eI github.com/mitchellh/go-wordwrap v0.0.0-20150314170334-ad45545899c7/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= @@ -785,13 +794,13 @@ github.com/opencontainers/runtime-spec v0.1.2-0.20190507144316-5b71a03e2700/go.m github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pborman/getopt v0.0.0-20180729010549-6fdd0a2c7117/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/philhofer/fwd v1.1.1/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9 h1:6ob53CVz+ja2i7easAStApZJlh7sxyq3Cm7g1Di6iqA= github.com/philhofer/fwd v1.1.2-0.20210722190033-5c56ac6d0bb9/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= @@ -866,10 +875,12 @@ github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417/go.mod h1:qe5TWALJ8/a1 github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/go-glob v1.0.0/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc= github.com/sahilm/fuzzy v0.1.0/go.mod h1:VFvziUEIMCrT6A6tw2RFIXPXXmzXbOsSHF0DOI8ZK9Y= +github.com/scylladb/termtables v0.0.0-20191203121021-c4c0b6d42ff4/go.mod h1:C1a7PQSMz9NShzorzCiG2fk9+xuCgLkPeCvMHYR2OWg= github.com/secure-io/sio-go v0.3.1 h1:dNvY9awjabXTYGsTF1PiCySl9Ltofk9GA3VdWlo7rRc= github.com/secure-io/sio-go v0.3.1/go.mod h1:+xbkjDzPjwh4Axd07pRKSNriS9SCiYksWnZqdnfpQxs= github.com/shirou/gopsutil/v3 v3.22.2 h1:wCrArWFkHYIdDxx/FSfF5RB4dpJYW6t7rcp3+zL8uks= @@ -889,11 +900,27 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.2-0.20171109065643-2da4a54c5cee/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.2-0.20171109065643-2da4a54c5cee/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.1-0.20171106142849-4c012f6dcd95/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -905,8 +932,6 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/tidwall/gjson v1.10.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= -github.com/tidwall/gjson v1.11.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.14.0 h1:6aeJ0bzojgWLa82gDQHcx3S0Lr/O51I9bJ5nv6JFx5w= github.com/tidwall/gjson v1.14.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -914,8 +939,6 @@ github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JT github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= -github.com/tidwall/sjson v1.2.3 h1:5+deguEhHSEjmuICXZ21uSSsXotWMA0orU783+Z7Cp8= -github.com/tidwall/sjson v1.2.3/go.mod h1:5WdjKx3AQMvCJ4RG6/2UYT7dLrGvJUV1x4jdTAyGvZs= github.com/tinylib/msgp v1.1.5/go.mod h1:eQsjooMTnV42mHu917E26IogZ2930nFyBQdofk10Udg= github.com/tinylib/msgp v1.1.7-0.20211026165309-e818a1881b0e h1:P5tyWbssToKowBPTA1/EzqPXwrZNc8ZeNPdjgpcDEoI= github.com/tinylib/msgp v1.1.7-0.20211026165309-e818a1881b0e/go.mod h1:g7jEyb18KPe65d9RRhGw+ThaJr5duyBH8eaFgBUor7Y= @@ -927,6 +950,7 @@ github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq// github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/unrolled/secure v1.10.0 h1:TBNP42z2AB+2pW9PR6vdbqhlQuv1iTeSVzK1qHjOBzA= github.com/unrolled/secure v1.10.0/go.mod h1:BmF5hyM6tXczk3MpQkFf1hpKSRqCyhqcbiQtiAF7+40= github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= @@ -940,6 +964,7 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yargevad/filepathx v1.0.0 h1:SYcT+N3tYGi+NvazubCNlvgIPbzAk7i7y2dwg3I5FYc= github.com/yargevad/filepathx v1.0.0/go.mod h1:BprfX/gpYNJHJfc35GjRRpVcwWXS89gGulUIU5tK3tA= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= @@ -993,6 +1018,7 @@ go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= @@ -1143,6 +1169,7 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190129075346-302c3dd5f1cc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/s3select/parquet/reader.go b/internal/s3select/parquet/reader.go index 19cf07ffb..7d27c3a35 100644 --- a/internal/s3select/parquet/reader.go +++ b/internal/s3select/parquet/reader.go @@ -18,88 +18,55 @@ package parquet import ( - "fmt" + "errors" "io" "time" "github.com/bcicen/jstream" + parquetgo "github.com/fraugster/parquet-go" + parquettypes "github.com/fraugster/parquet-go/parquet" jsonfmt "github.com/minio/minio/internal/s3select/json" "github.com/minio/minio/internal/s3select/sql" - parquetgo "github.com/minio/parquet-go" - parquetgen "github.com/minio/parquet-go/gen-go/parquet" ) -// Reader - Parquet record reader for S3Select. +// Reader implements reading records from parquet input. type Reader struct { - args *ReaderArgs - reader *parquetgo.Reader + io.Closer + r *parquetgo.FileReader } -// Read - reads single record. -func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) { - defer func() { - if rec := recover(); rec != nil { - rerr = fmt.Errorf("panic reading parquet record: %v", rec) - } - }() - - parquetRecord, err := r.reader.Read() +// NewParquetReader creates a Reader2 from a io.ReadSeekCloser. +func NewParquetReader(rsc io.ReadSeekCloser, _ *ReaderArgs) (r *Reader, err error) { + fr, err := parquetgo.NewFileReader(rsc) if err != nil { - if err != io.EOF { - return nil, errParquetParsingError(err) - } + return nil, errParquetParsingError(err) + } - return nil, err + return &Reader{Closer: rsc, r: fr}, nil +} + +func (pr *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) { + nextRow, err := pr.r.NextRow() + if err != nil { + if err == io.EOF { + return nil, err + } + return nil, errParquetParsingError(err) } kvs := jstream.KVS{} - f := func(name string, v parquetgo.Value) bool { - if v.Value == nil { - kvs = append(kvs, jstream.KV{Key: name, Value: nil}) - return true - } + for _, col := range pr.r.Columns() { var value interface{} - switch v.Type { - case parquetgen.Type_BOOLEAN: - value = v.Value.(bool) - case parquetgen.Type_INT32: - value = int64(v.Value.(int32)) - if v.Schema != nil && v.Schema.ConvertedType != nil { - switch *v.Schema.ConvertedType { - case parquetgen.ConvertedType_DATE: - value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(v.Value.(int32)), 0).UTC()) - } + if v, ok := nextRow[col.FlatName()]; ok { + value, err = convertFromAnnotation(col.Element(), v) + if err != nil { + return nil, errParquetParsingError(err) } - case parquetgen.Type_INT64: - value = v.Value.(int64) - if v.Schema != nil && v.Schema.ConvertedType != nil { - switch *v.Schema.ConvertedType { - // Only UTC supported, add one NS to never be exactly midnight. - case parquetgen.ConvertedType_TIMESTAMP_MILLIS: - value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Millisecond).UTC()) - case parquetgen.ConvertedType_TIMESTAMP_MICROS: - value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(time.Duration(v.Value.(int64)) * time.Microsecond).UTC()) - } - } - case parquetgen.Type_FLOAT: - value = float64(v.Value.(float32)) - case parquetgen.Type_DOUBLE: - value = v.Value.(float64) - case parquetgen.Type_INT96, parquetgen.Type_BYTE_ARRAY, parquetgen.Type_FIXED_LEN_BYTE_ARRAY: - value = string(v.Value.([]byte)) - default: - rerr = errParquetParsingError(nil) - return false } - - kvs = append(kvs, jstream.KV{Key: name, Value: value}) - return true + kvs = append(kvs, jstream.KV{Key: col.FlatName(), Value: value}) } - // Apply our range - parquetRecord.Range(f) - // Reuse destination if we can. dstRec, ok := dst.(*jsonfmt.Record) if !ok { @@ -110,29 +77,67 @@ func (r *Reader) Read(dst sql.Record) (rec sql.Record, rerr error) { return dstRec, nil } -// Close - closes underlying readers. -func (r *Reader) Close() error { - return r.reader.Close() -} - -// NewReader - creates new Parquet reader using readerFunc callback. -func NewReader(getReaderFunc func(offset, length int64) (io.ReadCloser, error), args *ReaderArgs) (r *Reader, err error) { - defer func() { - if rec := recover(); rec != nil { - err = fmt.Errorf("panic reading parquet header: %v", rec) - } - }() - reader, err := parquetgo.NewReader(getReaderFunc, nil) - if err != nil { - if err != io.EOF { - return nil, errParquetParsingError(err) - } - - return nil, err +// convertFromAnnotation - converts values based on the Parquet column's type +// annotations. LogicalType annotations if present override the deprecated +// ConvertedType annotations. Ref: +// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md +func convertFromAnnotation(se *parquettypes.SchemaElement, v interface{}) (interface{}, error) { + if se == nil { + return v, nil } - return &Reader{ - args: args, - reader: reader, - }, nil + var value interface{} + switch val := v.(type) { + case []byte: + // TODO: only strings are supported in s3select output (not + // binary arrays) - perhaps we need to check the annotation to + // ensure it's UTF8 encoded. + value = string(val) + case [12]byte: + // TODO: This is returned for the parquet INT96 type. We just + // treat it same as []byte (but AWS S3 treats it as a large int) + // - fix this later. + value = string(val[:]) + case int32: + value = int64(val) + if logicalType := se.GetLogicalType(); logicalType != nil { + if logicalType.IsSetDATE() { + value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC()) + } + } else if se.GetConvertedType() == parquettypes.ConvertedType_DATE { + value = sql.FormatSQLTimestamp(time.Unix(60*60*24*int64(val), 0).UTC()) + } + case int64: + value = val + if logicalType := se.GetLogicalType(); logicalType != nil { + if ts := logicalType.GetTIMESTAMP(); ts != nil { + var duration time.Duration + // Only support UTC normalized timestamps. + if ts.IsAdjustedToUTC { + switch { + case ts.Unit.IsSetNANOS(): + duration = time.Duration(val) * time.Nanosecond + case ts.Unit.IsSetMILLIS(): + duration = time.Duration(val) * time.Millisecond + case ts.Unit.IsSetMICROS(): + duration = time.Duration(val) * time.Microsecond + default: + return nil, errors.New("Invalid LogicalType annotation found") + } + value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration)) + } + } else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MILLIS { + duration := time.Duration(val) * time.Millisecond + value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration)) + } else if se.GetConvertedType() == parquettypes.ConvertedType_TIMESTAMP_MICROS { + duration := time.Duration(val) * time.Microsecond + value = sql.FormatSQLTimestamp(time.Unix(0, 0).Add(duration)) + } + } + case float32: + value = float64(val) + default: + value = v + } + return value, nil } diff --git a/internal/s3select/select.go b/internal/s3select/select.go index 6026bd45f..f6123a79e 100644 --- a/internal/s3select/select.go +++ b/internal/s3select/select.go @@ -214,10 +214,10 @@ type RequestProgress struct { // ScanRange represents the ScanRange parameter. type ScanRange struct { - // Start if byte offset form the start off the file. + // Start is the byte offset to read from (from the start of the file). Start *uint64 `xml:"Start"` - // End is the last byte that should be returned, if Start is set, - // or the offset from EOF to start reading if start is not present. + // End is the offset of the last byte that should be returned when Start + // is set, otherwise it is the offset from EOF to start reading. End *uint64 `xml:"End"` } @@ -362,21 +362,29 @@ func (s3Select *S3Select) getProgress() (bytesScanned, bytesProcessed int64) { // Open - opens S3 object by using callback for SQL selection query. // Currently CSV, JSON and Apache Parquet formats are supported. -func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadCloser, error)) error { - offset, end, err := s3Select.ScanRange.StartLen() +func (s3Select *S3Select) Open(rsc io.ReadSeekCloser) error { + offset, length, err := s3Select.ScanRange.StartLen() if err != nil { return err } + seekDirection := io.SeekStart + if offset < 0 { + seekDirection = io.SeekEnd + } switch s3Select.Input.format { case csvFormat: - rc, err := getReader(offset, end) + _, err = rsc.Seek(offset, seekDirection) if err != nil { return err } + var rc io.ReadCloser = rsc + if length != -1 { + rc = newLimitedReadCloser(rsc, length) + } s3Select.progressReader, err = newProgressReader(rc, s3Select.Input.CompressionType) if err != nil { - rc.Close() + rsc.Close() return err } @@ -405,14 +413,18 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos } return nil case jsonFormat: - rc, err := getReader(offset, end) + _, err = rsc.Seek(offset, seekDirection) if err != nil { return err } + var rc io.ReadCloser = rsc + if length != -1 { + rc = newLimitedReadCloser(rsc, length) + } s3Select.progressReader, err = newProgressReader(rc, s3Select.Input.CompressionType) if err != nil { - rc.Close() + rsc.Close() return err } @@ -431,12 +443,12 @@ func (s3Select *S3Select) Open(getReader func(offset, length int64) (io.ReadClos if !strings.EqualFold(os.Getenv("MINIO_API_SELECT_PARQUET"), "on") { return errors.New("parquet format parsing not enabled on server") } - if offset != 0 || end != -1 { + if offset != 0 || length != -1 { // Offsets do not make sense in parquet files. return errors.New("parquet format does not support offsets") } var err error - s3Select.recordReader, err = parquet.NewReader(getReader, &s3Select.Input.ParquetArgs) + s3Select.recordReader, err = parquet.NewParquetReader(rsc, &s3Select.Input.ParquetArgs) return err } @@ -657,3 +669,93 @@ func NewS3Select(r io.Reader) (*S3Select, error) { return s3Select, nil } + +////////////////// +// Helpers +///////////////// + +// limitedReadCloser is like io.LimitedReader, but also implements io.Closer. +type limitedReadCloser struct { + io.LimitedReader + io.Closer +} + +func newLimitedReadCloser(r io.ReadCloser, n int64) *limitedReadCloser { + return &limitedReadCloser{ + LimitedReader: io.LimitedReader{R: r, N: n}, + Closer: r, + } +} + +// ObjectSegmentReaderFn is a function that returns a reader for a contiguous +// suffix segment of an object starting at the given (non-negative) offset. +type ObjectSegmentReaderFn func(offset int64) (io.ReadCloser, error) + +// ObjectReadSeekCloser implements ReadSeekCloser interface for reading objects. +// It uses a function that returns a io.ReadCloser for the object. +type ObjectReadSeekCloser struct { + segmentReader ObjectSegmentReaderFn + + size int64 // actual object size regardless of compression/encryption + offset int64 + reader io.ReadCloser +} + +// NewObjectReadSeekCloser creates a new ObjectReadSeekCloser. +func NewObjectReadSeekCloser(segmentReader ObjectSegmentReaderFn, actualSize int64) *ObjectReadSeekCloser { + return &ObjectReadSeekCloser{ + segmentReader: segmentReader, + size: actualSize, + offset: 0, + reader: nil, + } +} + +// Seek call to implement io.Seeker +func (rsc *ObjectReadSeekCloser) Seek(offset int64, whence int) (int64, error) { + // fmt.Printf("actual: %v offset: %v (%v) whence: %v\n", rsc.size, offset, rsc.offset, whence) + switch whence { + case io.SeekStart: + rsc.offset = offset + case io.SeekCurrent: + rsc.offset += offset + case io.SeekEnd: + rsc.offset = rsc.size + offset + } + if rsc.offset < 0 { + return rsc.offset, errors.New("seek to invalid negative offset") + } + if rsc.offset >= rsc.size { + return rsc.offset, errors.New("seek past end of object") + } + if rsc.reader != nil { + _ = rsc.reader.Close() + rsc.reader = nil + } + return rsc.offset, nil +} + +// Read call to implement io.Reader +func (rsc *ObjectReadSeekCloser) Read(p []byte) (n int, err error) { + if rsc.reader == nil { + rsc.reader, err = rsc.segmentReader(rsc.offset) + if err != nil { + return 0, err + } + } + return rsc.reader.Read(p) +} + +// Close call to implement io.Closer. Calling Read/Seek after Close reopens the +// object for reading and a subsequent Close call is required to ensure +// resources are freed. +func (rsc *ObjectReadSeekCloser) Close() error { + if rsc.reader != nil { + err := rsc.reader.Close() + if err == nil { + rsc.reader = nil + } + return err + } + return nil +} diff --git a/internal/s3select/select_benchmark_test.go b/internal/s3select/select_benchmark_test.go index c4a6bdb73..d21c8325a 100644 --- a/internal/s3select/select_benchmark_test.go +++ b/internal/s3select/select_benchmark_test.go @@ -20,8 +20,6 @@ package s3select import ( "bytes" "encoding/csv" - "io" - "io/ioutil" "math/rand" "net/http" "strconv" @@ -112,9 +110,7 @@ func benchmarkSelect(b *testing.B, count int, query string) { b.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(csvData)), nil - }); err != nil { + if err = s3Select.Open(newBytesRSC(csvData)); err != nil { b.Fatal(err) } diff --git a/internal/s3select/select_test.go b/internal/s3select/select_test.go index da95e6bd5..8dbb20181 100644 --- a/internal/s3select/select_test.go +++ b/internal/s3select/select_test.go @@ -20,7 +20,6 @@ package s3select import ( "bytes" "encoding/xml" - "errors" "fmt" "io" "io/ioutil" @@ -35,6 +34,22 @@ import ( "github.com/minio/simdjson-go" ) +func newStringRSC(s string) io.ReadSeekCloser { + return newBytesRSC([]byte(s)) +} + +func newBytesRSC(b []byte) io.ReadSeekCloser { + r := bytes.NewReader(b) + segmentReader := func(offset int64) (io.ReadCloser, error) { + _, err := r.Seek(offset, io.SeekStart) + if err != nil { + return nil, err + } + return io.NopCloser(r), nil + } + return NewObjectReadSeekCloser(segmentReader, int64(len(b))) +} + type testResponseWriter struct { statusCode int response []byte @@ -608,13 +623,11 @@ func TestJSONQueries(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - in := input - if len(testCase.withJSON) > 0 { - in = testCase.withJSON - } - return ioutil.NopCloser(bytes.NewBufferString(in)), nil - }); err != nil { + in := input + if len(testCase.withJSON) > 0 { + in = testCase.withJSON + } + if err = s3Select.Open(newStringRSC(in)); err != nil { t.Fatal(err) } @@ -656,13 +669,11 @@ func TestJSONQueries(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - in := input - if len(testCase.withJSON) > 0 { - in = testCase.withJSON - } - return ioutil.NopCloser(bytes.NewBufferString(in)), nil - }); err != nil { + in := input + if len(testCase.withJSON) > 0 { + in = testCase.withJSON + } + if err = s3Select.Open(newStringRSC(in)); err != nil { t.Fatal(err) } @@ -743,9 +754,7 @@ func TestCSVQueries(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewBufferString(input)), nil - }); err != nil { + if err = s3Select.Open(newStringRSC(input)); err != nil { t.Fatal(err) } @@ -928,9 +937,7 @@ func TestCSVQueries2(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewBuffer(testCase.input)), nil - }); err != nil { + if err = s3Select.Open(newBytesRSC(testCase.input)); err != nil { t.Fatal(err) } @@ -1074,9 +1081,7 @@ true`, t.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewBufferString(input)), nil - }); err != nil { + if err = s3Select.Open(newStringRSC(input)); err != nil { t.Fatal(err) } @@ -1220,9 +1225,7 @@ func TestCSVInput(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(csvData)), nil - }); err != nil { + if err = s3Select.Open(newBytesRSC(csvData)); err != nil { t.Fatal(err) } @@ -1342,9 +1345,7 @@ func TestJSONInput(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - return ioutil.NopCloser(bytes.NewReader(jsonData)), nil - }); err != nil { + if err = s3Select.Open(newBytesRSC(jsonData)); err != nil { t.Fatal(err) } @@ -1616,37 +1617,7 @@ func TestCSVRanges(t *testing.T) { return } - if err = s3Select.Open(func(offset, length int64) (io.ReadCloser, error) { - in := testCase.input - if offset != 0 || length != -1 { - // Copy from SelectObjectContentHandler - isSuffixLength := false - if offset < 0 { - isSuffixLength = true - } - - if length > 0 { - length-- - } - - rs := &httpRangeSpec{ - IsSuffixLength: isSuffixLength, - Start: offset, - End: offset + length, - } - if length == -1 { - rs.End = -1 - } - t.Log("input, offset:", offset, "length:", length, "size:", len(in)) - offset, length, err = rs.GetOffsetLength(int64(len(in))) - if err != nil { - return nil, err - } - t.Log("rs:", *rs, "offset:", offset, "length:", length) - in = in[offset : offset+length] - } - return ioutil.NopCloser(bytes.NewBuffer(in)), nil - }); err != nil { + if err = s3Select.Open(newBytesRSC(testCase.input)); err != nil { if !testCase.wantErr { t.Fatal(err) } @@ -1741,27 +1712,10 @@ func TestParquetInput(t *testing.T) { for i, testCase := range testTable { t.Run(fmt.Sprint(i), func(t *testing.T) { - getReader := func(offset int64, length int64) (io.ReadCloser, error) { - testdataFile := "testdata/testdata.parquet" - file, err := os.Open(testdataFile) - if err != nil { - return nil, err - } - - fi, err := file.Stat() - if err != nil { - return nil, err - } - - if offset < 0 { - offset = fi.Size() + offset - } - - if _, err = file.Seek(offset, io.SeekStart); err != nil { - return nil, err - } - - return file, nil + testdataFile := "testdata/testdata.parquet" + file, err := os.Open(testdataFile) + if err != nil { + t.Fatal(err) } s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) @@ -1769,10 +1723,12 @@ func TestParquetInput(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(getReader); err != nil { + if err = s3Select.Open(file); err != nil { t.Fatal(err) } + fmt.Printf("R: \nE: %s\n" /* string(w.response), */, string(testCase.expectedResult)) + w := &testResponseWriter{} s3Select.Evaluate(w) s3Select.Close() @@ -1862,27 +1818,10 @@ func TestParquetInputSchema(t *testing.T) { for i, testCase := range testTable { t.Run(fmt.Sprint(i), func(t *testing.T) { - getReader := func(offset int64, length int64) (io.ReadCloser, error) { - testdataFile := "testdata/lineitem_shipdate.parquet" - file, err := os.Open(testdataFile) - if err != nil { - return nil, err - } - - fi, err := file.Stat() - if err != nil { - return nil, err - } - - if offset < 0 { - offset = fi.Size() + offset - } - - if _, err = file.Seek(offset, io.SeekStart); err != nil { - return nil, err - } - - return file, nil + testdataFile := "testdata/lineitem_shipdate.parquet" + file, err := os.Open(testdataFile) + if err != nil { + t.Fatal(err) } s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) @@ -1890,7 +1829,7 @@ func TestParquetInputSchema(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(getReader); err != nil { + if err = s3Select.Open(file); err != nil { t.Fatal(err) } @@ -1980,27 +1919,10 @@ func TestParquetInputSchemaCSV(t *testing.T) { for i, testCase := range testTable { t.Run(fmt.Sprint(i), func(t *testing.T) { - getReader := func(offset int64, length int64) (io.ReadCloser, error) { - testdataFile := "testdata/lineitem_shipdate.parquet" - file, err := os.Open(testdataFile) - if err != nil { - return nil, err - } - - fi, err := file.Stat() - if err != nil { - return nil, err - } - - if offset < 0 { - offset = fi.Size() + offset - } - - if _, err = file.Seek(offset, io.SeekStart); err != nil { - return nil, err - } - - return file, nil + testdataFile := "testdata/lineitem_shipdate.parquet" + file, err := os.Open(testdataFile) + if err != nil { + t.Fatal(err) } s3Select, err := NewS3Select(bytes.NewReader(testCase.requestXML)) @@ -2008,7 +1930,7 @@ func TestParquetInputSchemaCSV(t *testing.T) { t.Fatal(err) } - if err = s3Select.Open(getReader); err != nil { + if err = s3Select.Open(file); err != nil { t.Fatal(err) } @@ -2037,76 +1959,3 @@ func TestParquetInputSchemaCSV(t *testing.T) { }) } } - -// httpRangeSpec represents a range specification as supported by S3 GET -// object request. -// -// Case 1: Not present -> represented by a nil RangeSpec -// Case 2: bytes=1-10 (absolute start and end offsets) -> RangeSpec{false, 1, 10} -// Case 3: bytes=10- (absolute start offset with end offset unspecified) -> RangeSpec{false, 10, -1} -// Case 4: bytes=-30 (suffix length specification) -> RangeSpec{true, -30, -1} -type httpRangeSpec struct { - // Does the range spec refer to a suffix of the object? - IsSuffixLength bool - - // Start and end offset specified in range spec - Start, End int64 -} - -func (h *httpRangeSpec) GetLength(resourceSize int64) (rangeLength int64, err error) { - switch { - case resourceSize < 0: - return 0, errors.New("Resource size cannot be negative") - - case h == nil: - rangeLength = resourceSize - - case h.IsSuffixLength: - specifiedLen := -h.Start - rangeLength = specifiedLen - if specifiedLen > resourceSize { - rangeLength = resourceSize - } - - case h.Start >= resourceSize: - return 0, errors.New("errInvalidRange") - - case h.End > -1: - end := h.End - if resourceSize <= end { - end = resourceSize - 1 - } - rangeLength = end - h.Start + 1 - - case h.End == -1: - rangeLength = resourceSize - h.Start - - default: - return 0, errors.New("Unexpected range specification case") - } - - return rangeLength, nil -} - -// GetOffsetLength computes the start offset and length of the range -// given the size of the resource -func (h *httpRangeSpec) GetOffsetLength(resourceSize int64) (start, length int64, err error) { - if h == nil { - // No range specified, implies whole object. - return 0, resourceSize, nil - } - - length, err = h.GetLength(resourceSize) - if err != nil { - return 0, 0, err - } - - start = h.Start - if h.IsSuffixLength { - start = resourceSize + h.Start - if start < 0 { - start = 0 - } - } - return start, length, nil -}