mirror of
				https://github.com/minio/minio.git
				synced 2025-10-31 08:11:19 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			892 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			892 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) 2015-2021 MinIO, Inc.
 | |
| //
 | |
| // This file is part of MinIO Object Storage stack
 | |
| //
 | |
| // This program is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Affero General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // This program is distributed in the hope that it will be useful
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| // GNU Affero General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Affero General Public License
 | |
| // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package hdfs
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"os/user"
 | |
| 	"path"
 | |
| 	"sort"
 | |
| 	"strings"
 | |
| 	"syscall"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/colinmarc/hdfs/v2"
 | |
| 	"github.com/colinmarc/hdfs/v2/hadoopconf"
 | |
| 	krb "github.com/jcmturner/gokrb5/v8/client"
 | |
| 	"github.com/jcmturner/gokrb5/v8/config"
 | |
| 	"github.com/jcmturner/gokrb5/v8/credentials"
 | |
| 	"github.com/jcmturner/gokrb5/v8/keytab"
 | |
| 	"github.com/minio/cli"
 | |
| 	"github.com/minio/minio-go/v7/pkg/s3utils"
 | |
| 	minio "github.com/minio/minio/cmd"
 | |
| 	"github.com/minio/minio/cmd/logger"
 | |
| 	"github.com/minio/minio/pkg/auth"
 | |
| 	"github.com/minio/minio/pkg/env"
 | |
| 	"github.com/minio/minio/pkg/madmin"
 | |
| 	xnet "github.com/minio/minio/pkg/net"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	hdfsSeparator = minio.SlashSeparator
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	const hdfsGatewayTemplate = `NAME:
 | |
|   {{.HelpName}} - {{.Usage}}
 | |
| 
 | |
| USAGE:
 | |
|   {{.HelpName}} {{if .VisibleFlags}}[FLAGS]{{end}} HDFS-NAMENODE [HDFS-NAMENODE...]
 | |
| {{if .VisibleFlags}}
 | |
| FLAGS:
 | |
|   {{range .VisibleFlags}}{{.}}
 | |
|   {{end}}{{end}}
 | |
| HDFS-NAMENODE:
 | |
|   HDFS namenode URI
 | |
| 
 | |
| EXAMPLES:
 | |
|   1. Start minio gateway server for HDFS backend
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}accesskey
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}secretkey
 | |
|      {{.Prompt}} {{.HelpName}} hdfs://namenode:8200
 | |
| 
 | |
|   2. Start minio gateway server for HDFS with edge caching enabled
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_USER{{.AssignmentOperator}}accesskey
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_ROOT_PASSWORD{{.AssignmentOperator}}secretkey
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_DRIVES{{.AssignmentOperator}}"/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4"
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_EXCLUDE{{.AssignmentOperator}}"bucket1/*,*.png"
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_QUOTA{{.AssignmentOperator}}90
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_AFTER{{.AssignmentOperator}}3
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_LOW{{.AssignmentOperator}}75
 | |
|      {{.Prompt}} {{.EnvVarSetCommand}} MINIO_CACHE_WATERMARK_HIGH{{.AssignmentOperator}}85
 | |
|      {{.Prompt}} {{.HelpName}} hdfs://namenode:8200
 | |
| `
 | |
| 
 | |
| 	minio.RegisterGatewayCommand(cli.Command{
 | |
| 		Name:               minio.HDFSBackendGateway,
 | |
| 		Usage:              "Hadoop Distributed File System (HDFS)",
 | |
| 		Action:             hdfsGatewayMain,
 | |
| 		CustomHelpTemplate: hdfsGatewayTemplate,
 | |
| 		HideHelpCommand:    true,
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // Handler for 'minio gateway hdfs' command line.
 | |
| func hdfsGatewayMain(ctx *cli.Context) {
 | |
| 	// Validate gateway arguments.
 | |
| 	if ctx.Args().First() == "help" {
 | |
| 		cli.ShowCommandHelpAndExit(ctx, minio.HDFSBackendGateway, 1)
 | |
| 	}
 | |
| 
 | |
| 	minio.StartGateway(ctx, &HDFS{args: ctx.Args()})
 | |
| }
 | |
| 
 | |
| // HDFS implements Gateway.
 | |
| type HDFS struct {
 | |
| 	args []string
 | |
| }
 | |
| 
 | |
| // Name implements Gateway interface.
 | |
| func (g *HDFS) Name() string {
 | |
| 	return minio.HDFSBackendGateway
 | |
| }
 | |
| 
 | |
| func getKerberosClient() (*krb.Client, error) {
 | |
| 	cfg, err := config.Load(env.Get("KRB5_CONFIG", "/etc/krb5.conf"))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	u, err := user.Current()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	keytabPath := env.Get("KRB5KEYTAB", "")
 | |
| 	if keytabPath != "" {
 | |
| 		kt, err := keytab.Load(keytabPath)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		username := env.Get("KRB5USERNAME", "")
 | |
| 		realm := env.Get("KRB5REALM", "")
 | |
| 		if username == "" || realm == "" {
 | |
| 			return nil, errors.New("empty KRB5USERNAME or KRB5REALM")
 | |
| 
 | |
| 		}
 | |
| 
 | |
| 		return krb.NewWithKeytab(username, realm, kt, cfg), nil
 | |
| 	}
 | |
| 
 | |
| 	// Determine the ccache location from the environment, falling back to the default location.
 | |
| 	ccachePath := env.Get("KRB5CCNAME", fmt.Sprintf("/tmp/krb5cc_%s", u.Uid))
 | |
| 	if strings.Contains(ccachePath, ":") {
 | |
| 		if strings.HasPrefix(ccachePath, "FILE:") {
 | |
| 			ccachePath = strings.TrimPrefix(ccachePath, "FILE:")
 | |
| 		} else {
 | |
| 			return nil, fmt.Errorf("unable to use kerberos ccache: %s", ccachePath)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	ccache, err := credentials.LoadCCache(ccachePath)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return krb.NewFromCCache(ccache, cfg)
 | |
| }
 | |
| 
 | |
| // NewGatewayLayer returns hdfs gatewaylayer.
 | |
| func (g *HDFS) NewGatewayLayer(creds auth.Credentials) (minio.ObjectLayer, error) {
 | |
| 	dialFunc := (&net.Dialer{
 | |
| 		Timeout:   30 * time.Second,
 | |
| 		KeepAlive: 30 * time.Second,
 | |
| 		DualStack: true,
 | |
| 	}).DialContext
 | |
| 
 | |
| 	hconfig, err := hadoopconf.LoadFromEnvironment()
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	opts := hdfs.ClientOptionsFromConf(hconfig)
 | |
| 	opts.NamenodeDialFunc = dialFunc
 | |
| 	opts.DatanodeDialFunc = dialFunc
 | |
| 
 | |
| 	// Not addresses found, load it from command line.
 | |
| 	var commonPath string
 | |
| 	if len(opts.Addresses) == 0 {
 | |
| 		var addresses []string
 | |
| 		for _, s := range g.args {
 | |
| 			u, err := xnet.ParseURL(s)
 | |
| 			if err != nil {
 | |
| 				return nil, err
 | |
| 			}
 | |
| 			if u.Scheme != "hdfs" {
 | |
| 				return nil, fmt.Errorf("unsupported scheme %s, only supports hdfs://", u)
 | |
| 			}
 | |
| 			if commonPath != "" && commonPath != u.Path {
 | |
| 				return nil, fmt.Errorf("all namenode paths should be same %s", g.args)
 | |
| 			}
 | |
| 			if commonPath == "" {
 | |
| 				commonPath = u.Path
 | |
| 			}
 | |
| 			addresses = append(addresses, u.Host)
 | |
| 		}
 | |
| 		opts.Addresses = addresses
 | |
| 	}
 | |
| 
 | |
| 	u, err := user.Current()
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("unable to lookup local user: %s", err)
 | |
| 	}
 | |
| 
 | |
| 	if opts.KerberosClient != nil {
 | |
| 		opts.KerberosClient, err = getKerberosClient()
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("unable to initialize kerberos client: %s", err)
 | |
| 		}
 | |
| 	} else {
 | |
| 		opts.User = env.Get("HADOOP_USER_NAME", u.Username)
 | |
| 	}
 | |
| 
 | |
| 	clnt, err := hdfs.NewClient(opts)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("unable to initialize hdfsClient: %v", err)
 | |
| 	}
 | |
| 
 | |
| 	if err = clnt.MkdirAll(minio.PathJoin(commonPath, hdfsSeparator, minioMetaTmpBucket), os.FileMode(0755)); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &hdfsObjects{clnt: clnt, subPath: commonPath, listPool: minio.NewTreeWalkPool(time.Minute * 30)}, nil
 | |
| }
 | |
| 
 | |
| // Production - hdfs gateway is production ready.
 | |
| func (g *HDFS) Production() bool {
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) Shutdown(ctx context.Context) error {
 | |
| 	return n.clnt.Close()
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) LocalStorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) {
 | |
| 	return n.StorageInfo(ctx)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) StorageInfo(ctx context.Context) (si minio.StorageInfo, errs []error) {
 | |
| 	fsInfo, err := n.clnt.StatFs()
 | |
| 	if err != nil {
 | |
| 		return minio.StorageInfo{}, []error{err}
 | |
| 	}
 | |
| 	si.Disks = []madmin.Disk{{
 | |
| 		UsedSpace: fsInfo.Used,
 | |
| 	}}
 | |
| 	si.Backend.Type = madmin.Gateway
 | |
| 	si.Backend.GatewayOnline = true
 | |
| 	return si, nil
 | |
| }
 | |
| 
 | |
| // hdfsObjects implements gateway for Minio and S3 compatible object storage servers.
 | |
| type hdfsObjects struct {
 | |
| 	minio.GatewayUnsupported
 | |
| 	clnt     *hdfs.Client
 | |
| 	subPath  string
 | |
| 	listPool *minio.TreeWalkPool
 | |
| }
 | |
| 
 | |
| func hdfsToObjectErr(ctx context.Context, err error, params ...string) error {
 | |
| 	if err == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	bucket := ""
 | |
| 	object := ""
 | |
| 	uploadID := ""
 | |
| 	switch len(params) {
 | |
| 	case 3:
 | |
| 		uploadID = params[2]
 | |
| 		fallthrough
 | |
| 	case 2:
 | |
| 		object = params[1]
 | |
| 		fallthrough
 | |
| 	case 1:
 | |
| 		bucket = params[0]
 | |
| 	}
 | |
| 
 | |
| 	switch {
 | |
| 	case os.IsNotExist(err):
 | |
| 		if uploadID != "" {
 | |
| 			return minio.InvalidUploadID{
 | |
| 				UploadID: uploadID,
 | |
| 			}
 | |
| 		}
 | |
| 		if object != "" {
 | |
| 			return minio.ObjectNotFound{Bucket: bucket, Object: object}
 | |
| 		}
 | |
| 		return minio.BucketNotFound{Bucket: bucket}
 | |
| 	case os.IsExist(err):
 | |
| 		if object != "" {
 | |
| 			return minio.PrefixAccessDenied{Bucket: bucket, Object: object}
 | |
| 		}
 | |
| 		return minio.BucketAlreadyOwnedByYou{Bucket: bucket}
 | |
| 	case errors.Is(err, syscall.ENOTEMPTY):
 | |
| 		if object != "" {
 | |
| 			return minio.PrefixAccessDenied{Bucket: bucket, Object: object}
 | |
| 		}
 | |
| 		return minio.BucketNotEmpty{Bucket: bucket}
 | |
| 	default:
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		return err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // hdfsIsValidBucketName verifies whether a bucket name is valid.
 | |
| func hdfsIsValidBucketName(bucket string) bool {
 | |
| 	return s3utils.CheckValidBucketNameStrict(bucket) == nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) hdfsPathJoin(args ...string) string {
 | |
| 	return minio.PathJoin(append([]string{n.subPath, hdfsSeparator}, args...)...)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) DeleteBucket(ctx context.Context, bucket string, forceDelete bool) error {
 | |
| 	if !hdfsIsValidBucketName(bucket) {
 | |
| 		return minio.BucketNameInvalid{Bucket: bucket}
 | |
| 	}
 | |
| 	if forceDelete {
 | |
| 		return hdfsToObjectErr(ctx, n.clnt.RemoveAll(n.hdfsPathJoin(bucket)), bucket)
 | |
| 	}
 | |
| 	return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(bucket)), bucket)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) MakeBucketWithLocation(ctx context.Context, bucket string, opts minio.BucketOptions) error {
 | |
| 	if opts.LockEnabled || opts.VersioningEnabled {
 | |
| 		return minio.NotImplemented{}
 | |
| 	}
 | |
| 
 | |
| 	if !hdfsIsValidBucketName(bucket) {
 | |
| 		return minio.BucketNameInvalid{Bucket: bucket}
 | |
| 	}
 | |
| 	return hdfsToObjectErr(ctx, n.clnt.Mkdir(n.hdfsPathJoin(bucket), os.FileMode(0755)), bucket)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) GetBucketInfo(ctx context.Context, bucket string) (bi minio.BucketInfo, err error) {
 | |
| 	fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return bi, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 	// As hdfs.Stat() doesn't carry anything other than ModTime(), use ModTime() as CreatedTime.
 | |
| 	return minio.BucketInfo{
 | |
| 		Name:    bucket,
 | |
| 		Created: fi.ModTime(),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) ListBuckets(ctx context.Context) (buckets []minio.BucketInfo, err error) {
 | |
| 	entries, err := n.clnt.ReadDir(n.hdfsPathJoin())
 | |
| 	if err != nil {
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		return nil, hdfsToObjectErr(ctx, err)
 | |
| 	}
 | |
| 
 | |
| 	for _, entry := range entries {
 | |
| 		// Ignore all reserved bucket names and invalid bucket names.
 | |
| 		if isReservedOrInvalidBucket(entry.Name(), false) {
 | |
| 			continue
 | |
| 		}
 | |
| 		buckets = append(buckets, minio.BucketInfo{
 | |
| 			Name: entry.Name(),
 | |
| 			// As hdfs.Stat() doesnt carry CreatedTime, use ModTime() as CreatedTime.
 | |
| 			Created: entry.ModTime(),
 | |
| 		})
 | |
| 	}
 | |
| 
 | |
| 	// Sort bucket infos by bucket name.
 | |
| 	sort.Sort(byBucketName(buckets))
 | |
| 	return buckets, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) isLeafDir(bucket, leafPath string) bool {
 | |
| 	return n.isObjectDir(context.Background(), bucket, leafPath)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) isLeaf(bucket, leafPath string) bool {
 | |
| 	return !strings.HasSuffix(leafPath, hdfsSeparator)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) listDirFactory() minio.ListDirFunc {
 | |
| 	// listDir - lists all the entries at a given prefix and given entry in the prefix.
 | |
| 	listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, entries []string, delayIsLeaf bool) {
 | |
| 		f, err := n.clnt.Open(n.hdfsPathJoin(bucket, prefixDir))
 | |
| 		if err != nil {
 | |
| 			if os.IsNotExist(err) {
 | |
| 				err = nil
 | |
| 			}
 | |
| 			logger.LogIf(minio.GlobalContext, err)
 | |
| 			return
 | |
| 		}
 | |
| 		defer f.Close()
 | |
| 		fis, err := f.Readdir(0)
 | |
| 		if err != nil {
 | |
| 			logger.LogIf(minio.GlobalContext, err)
 | |
| 			return
 | |
| 		}
 | |
| 		if len(fis) == 0 {
 | |
| 			return true, nil, false
 | |
| 		}
 | |
| 		for _, fi := range fis {
 | |
| 			if fi.IsDir() {
 | |
| 				entries = append(entries, fi.Name()+hdfsSeparator)
 | |
| 			} else {
 | |
| 				entries = append(entries, fi.Name())
 | |
| 			}
 | |
| 		}
 | |
| 		entries, delayIsLeaf = minio.FilterListEntries(bucket, prefixDir, entries, prefixEntry, n.isLeaf)
 | |
| 		return false, entries, delayIsLeaf
 | |
| 	}
 | |
| 
 | |
| 	// Return list factory instance.
 | |
| 	return listDir
 | |
| }
 | |
| 
 | |
| // ListObjects lists all blobs in HDFS bucket filtered by prefix.
 | |
| func (n *hdfsObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi minio.ListObjectsInfo, err error) {
 | |
| 	fileInfos := make(map[string]os.FileInfo)
 | |
| 	targetPath := n.hdfsPathJoin(bucket, prefix)
 | |
| 
 | |
| 	var targetFileInfo os.FileInfo
 | |
| 
 | |
| 	if targetFileInfo, err = n.populateDirectoryListing(targetPath, fileInfos); err != nil {
 | |
| 		return loi, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	// If the user is trying to list a single file, bypass the entire directory-walking code below
 | |
| 	// and just return the single file's information.
 | |
| 	if !targetFileInfo.IsDir() {
 | |
| 		return minio.ListObjectsInfo{
 | |
| 			IsTruncated: false,
 | |
| 			NextMarker:  "",
 | |
| 			Objects: []minio.ObjectInfo{
 | |
| 				fileInfoToObjectInfo(bucket, prefix, targetFileInfo),
 | |
| 			},
 | |
| 			Prefixes: []string{},
 | |
| 		}, nil
 | |
| 	}
 | |
| 
 | |
| 	getObjectInfo := func(ctx context.Context, bucket, entry string) (minio.ObjectInfo, error) {
 | |
| 		filePath := path.Clean(n.hdfsPathJoin(bucket, entry))
 | |
| 		fi, ok := fileInfos[filePath]
 | |
| 
 | |
| 		// If the file info is not known, this may be a recursive listing and filePath is a
 | |
| 		// child of a sub-directory. In this case, obtain that sub-directory's listing.
 | |
| 		if !ok {
 | |
| 			parentPath := path.Dir(filePath)
 | |
| 
 | |
| 			if _, err := n.populateDirectoryListing(parentPath, fileInfos); err != nil {
 | |
| 				return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket)
 | |
| 			}
 | |
| 
 | |
| 			fi, ok = fileInfos[filePath]
 | |
| 
 | |
| 			if !ok {
 | |
| 				err = fmt.Errorf("could not get FileInfo for path '%s'", filePath)
 | |
| 				return minio.ObjectInfo{}, hdfsToObjectErr(ctx, err, bucket, entry)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		objectInfo := fileInfoToObjectInfo(bucket, entry, fi)
 | |
| 
 | |
| 		delete(fileInfos, filePath)
 | |
| 
 | |
| 		return objectInfo, nil
 | |
| 	}
 | |
| 
 | |
| 	return minio.ListObjects(ctx, n, bucket, prefix, marker, delimiter, maxKeys, n.listPool, n.listDirFactory(), n.isLeaf, n.isLeafDir, getObjectInfo, getObjectInfo)
 | |
| }
 | |
| 
 | |
| func fileInfoToObjectInfo(bucket string, entry string, fi os.FileInfo) minio.ObjectInfo {
 | |
| 	return minio.ObjectInfo{
 | |
| 		Bucket:  bucket,
 | |
| 		Name:    entry,
 | |
| 		ModTime: fi.ModTime(),
 | |
| 		Size:    fi.Size(),
 | |
| 		IsDir:   fi.IsDir(),
 | |
| 		AccTime: fi.(*hdfs.FileInfo).AccessTime(),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Lists a path's direct, first-level entries and populates them in the `fileInfos` cache which maps
 | |
| // a path entry to an `os.FileInfo`. It also saves the listed path's `os.FileInfo` in the cache.
 | |
| func (n *hdfsObjects) populateDirectoryListing(filePath string, fileInfos map[string]os.FileInfo) (os.FileInfo, error) {
 | |
| 	dirReader, err := n.clnt.Open(filePath)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	dirStat := dirReader.Stat()
 | |
| 	key := path.Clean(filePath)
 | |
| 
 | |
| 	if !dirStat.IsDir() {
 | |
| 		return dirStat, nil
 | |
| 	}
 | |
| 
 | |
| 	fileInfos[key] = dirStat
 | |
| 	infos, err := dirReader.Readdir(0)
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	for _, fileInfo := range infos {
 | |
| 		filePath := minio.PathJoin(filePath, fileInfo.Name())
 | |
| 		fileInfos[filePath] = fileInfo
 | |
| 	}
 | |
| 
 | |
| 	return dirStat, nil
 | |
| }
 | |
| 
 | |
| // deleteObject deletes a file path if its empty. If it's successfully deleted,
 | |
| // it will recursively move up the tree, deleting empty parent directories
 | |
| // until it finds one with files in it. Returns nil for a non-empty directory.
 | |
| func (n *hdfsObjects) deleteObject(basePath, deletePath string) error {
 | |
| 	if basePath == deletePath {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	// Attempt to remove path.
 | |
| 	if err := n.clnt.Remove(deletePath); err != nil {
 | |
| 		if errors.Is(err, syscall.ENOTEMPTY) {
 | |
| 			// Ignore errors if the directory is not empty. The server relies on
 | |
| 			// this functionality, and sometimes uses recursion that should not
 | |
| 			// error on parent directories.
 | |
| 			return nil
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	// Trailing slash is removed when found to ensure
 | |
| 	// slashpath.Dir() to work as intended.
 | |
| 	deletePath = strings.TrimSuffix(deletePath, hdfsSeparator)
 | |
| 	deletePath = path.Dir(deletePath)
 | |
| 
 | |
| 	// Delete parent directory. Errors for parent directories shouldn't trickle down.
 | |
| 	n.deleteObject(basePath, deletePath)
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ListObjectsV2 lists all blobs in HDFS bucket filtered by prefix
 | |
| func (n *hdfsObjects) ListObjectsV2(ctx context.Context, bucket, prefix, continuationToken, delimiter string, maxKeys int,
 | |
| 	fetchOwner bool, startAfter string) (loi minio.ListObjectsV2Info, err error) {
 | |
| 	// fetchOwner is not supported and unused.
 | |
| 	marker := continuationToken
 | |
| 	if marker == "" {
 | |
| 		marker = startAfter
 | |
| 	}
 | |
| 	resultV1, err := n.ListObjects(ctx, bucket, prefix, marker, delimiter, maxKeys)
 | |
| 	if err != nil {
 | |
| 		return loi, err
 | |
| 	}
 | |
| 	return minio.ListObjectsV2Info{
 | |
| 		Objects:               resultV1.Objects,
 | |
| 		Prefixes:              resultV1.Prefixes,
 | |
| 		ContinuationToken:     continuationToken,
 | |
| 		NextContinuationToken: resultV1.NextMarker,
 | |
| 		IsTruncated:           resultV1.IsTruncated,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) DeleteObject(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (minio.ObjectInfo, error) {
 | |
| 	err := hdfsToObjectErr(ctx, n.deleteObject(n.hdfsPathJoin(bucket), n.hdfsPathJoin(bucket, object)), bucket, object)
 | |
| 	return minio.ObjectInfo{
 | |
| 		Bucket: bucket,
 | |
| 		Name:   object,
 | |
| 	}, err
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) DeleteObjects(ctx context.Context, bucket string, objects []minio.ObjectToDelete, opts minio.ObjectOptions) ([]minio.DeletedObject, []error) {
 | |
| 	errs := make([]error, len(objects))
 | |
| 	dobjects := make([]minio.DeletedObject, len(objects))
 | |
| 	for idx, object := range objects {
 | |
| 		_, errs[idx] = n.DeleteObject(ctx, bucket, object.ObjectName, opts)
 | |
| 		if errs[idx] == nil {
 | |
| 			dobjects[idx] = minio.DeletedObject{
 | |
| 				ObjectName: object.ObjectName,
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return dobjects, errs
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string, rs *minio.HTTPRangeSpec, h http.Header, lockType minio.LockType, opts minio.ObjectOptions) (gr *minio.GetObjectReader, err error) {
 | |
| 	objInfo, err := n.GetObjectInfo(ctx, bucket, object, opts)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	var startOffset, length int64
 | |
| 	startOffset, length, err = rs.GetOffsetLength(objInfo.Size)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	pr, pw := io.Pipe()
 | |
| 	go func() {
 | |
| 		nerr := n.getObject(ctx, bucket, object, startOffset, length, pw, objInfo.ETag, opts)
 | |
| 		pw.CloseWithError(nerr)
 | |
| 	}()
 | |
| 
 | |
| 	// Setup cleanup function to cause the above go-routine to
 | |
| 	// exit in case of partial read
 | |
| 	pipeCloser := func() { pr.Close() }
 | |
| 	return minio.NewGetObjectReaderFromReader(pr, objInfo, opts, pipeCloser)
 | |
| 
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) CopyObject(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject string, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.ObjectInfo, error) {
 | |
| 	cpSrcDstSame := minio.IsStringEqual(n.hdfsPathJoin(srcBucket, srcObject), n.hdfsPathJoin(dstBucket, dstObject))
 | |
| 	if cpSrcDstSame {
 | |
| 		return n.GetObjectInfo(ctx, srcBucket, srcObject, minio.ObjectOptions{})
 | |
| 	}
 | |
| 
 | |
| 	return n.PutObject(ctx, dstBucket, dstObject, srcInfo.PutObjReader, minio.ObjectOptions{
 | |
| 		ServerSideEncryption: dstOpts.ServerSideEncryption,
 | |
| 		UserDefined:          srcInfo.UserDefined,
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) getObject(ctx context.Context, bucket, key string, startOffset, length int64, writer io.Writer, etag string, opts minio.ObjectOptions) error {
 | |
| 	if _, err := n.clnt.Stat(n.hdfsPathJoin(bucket)); err != nil {
 | |
| 		return hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 	rd, err := n.clnt.Open(n.hdfsPathJoin(bucket, key))
 | |
| 	if err != nil {
 | |
| 		return hdfsToObjectErr(ctx, err, bucket, key)
 | |
| 	}
 | |
| 	defer rd.Close()
 | |
| 	_, err = io.Copy(writer, io.NewSectionReader(rd, startOffset, length))
 | |
| 	if err == io.ErrClosedPipe {
 | |
| 		// hdfs library doesn't send EOF correctly, so io.Copy attempts
 | |
| 		// to write which returns io.ErrClosedPipe - just ignore
 | |
| 		// this for now.
 | |
| 		err = nil
 | |
| 	}
 | |
| 	return hdfsToObjectErr(ctx, err, bucket, key)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) isObjectDir(ctx context.Context, bucket, object string) bool {
 | |
| 	f, err := n.clnt.Open(n.hdfsPathJoin(bucket, object))
 | |
| 	if err != nil {
 | |
| 		if os.IsNotExist(err) {
 | |
| 			return false
 | |
| 		}
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		return false
 | |
| 	}
 | |
| 	defer f.Close()
 | |
| 	fis, err := f.Readdir(1)
 | |
| 	if err != nil && err != io.EOF {
 | |
| 		logger.LogIf(ctx, err)
 | |
| 		return false
 | |
| 	}
 | |
| 	// Readdir returns an io.EOF when len(fis) == 0.
 | |
| 	return len(fis) == 0
 | |
| }
 | |
| 
 | |
| // GetObjectInfo reads object info and replies back ObjectInfo.
 | |
| func (n *hdfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return objInfo, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 	if strings.HasSuffix(object, hdfsSeparator) && !n.isObjectDir(ctx, bucket, object) {
 | |
| 		return objInfo, hdfsToObjectErr(ctx, os.ErrNotExist, bucket, object)
 | |
| 	}
 | |
| 
 | |
| 	fi, err := n.clnt.Stat(n.hdfsPathJoin(bucket, object))
 | |
| 	if err != nil {
 | |
| 		return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 	}
 | |
| 	return minio.ObjectInfo{
 | |
| 		Bucket:  bucket,
 | |
| 		Name:    object,
 | |
| 		ModTime: fi.ModTime(),
 | |
| 		Size:    fi.Size(),
 | |
| 		IsDir:   fi.IsDir(),
 | |
| 		AccTime: fi.(*hdfs.FileInfo).AccessTime(),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) PutObject(ctx context.Context, bucket string, object string, r *minio.PutObjReader, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return objInfo, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	name := n.hdfsPathJoin(bucket, object)
 | |
| 
 | |
| 	// If its a directory create a prefix {
 | |
| 	if strings.HasSuffix(object, hdfsSeparator) && r.Size() == 0 {
 | |
| 		if err = n.clnt.MkdirAll(name, os.FileMode(0755)); err != nil {
 | |
| 			n.deleteObject(n.hdfsPathJoin(bucket), name)
 | |
| 			return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 		}
 | |
| 	} else {
 | |
| 		tmpname := n.hdfsPathJoin(minioMetaTmpBucket, minio.MustGetUUID())
 | |
| 		var w *hdfs.FileWriter
 | |
| 		w, err = n.clnt.Create(tmpname)
 | |
| 		if err != nil {
 | |
| 			return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 		}
 | |
| 		defer n.deleteObject(n.hdfsPathJoin(minioMetaTmpBucket), tmpname)
 | |
| 		if _, err = io.Copy(w, r); err != nil {
 | |
| 			w.Close()
 | |
| 			return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 		}
 | |
| 		dir := path.Dir(name)
 | |
| 		if dir != "" {
 | |
| 			if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil {
 | |
| 				w.Close()
 | |
| 				n.deleteObject(n.hdfsPathJoin(bucket), dir)
 | |
| 				return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 			}
 | |
| 		}
 | |
| 		w.Close()
 | |
| 		if err = n.clnt.Rename(tmpname, name); err != nil {
 | |
| 			return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 		}
 | |
| 	}
 | |
| 	fi, err := n.clnt.Stat(name)
 | |
| 	if err != nil {
 | |
| 		return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 	}
 | |
| 	return minio.ObjectInfo{
 | |
| 		Bucket:  bucket,
 | |
| 		Name:    object,
 | |
| 		ETag:    r.MD5CurrentHexString(),
 | |
| 		ModTime: fi.ModTime(),
 | |
| 		Size:    fi.Size(),
 | |
| 		IsDir:   fi.IsDir(),
 | |
| 		AccTime: fi.(*hdfs.FileInfo).AccessTime(),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) NewMultipartUpload(ctx context.Context, bucket string, object string, opts minio.ObjectOptions) (uploadID string, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return uploadID, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	uploadID = minio.MustGetUUID()
 | |
| 	if err = n.clnt.CreateEmptyFile(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)); err != nil {
 | |
| 		return uploadID, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	return uploadID, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) ListMultipartUploads(ctx context.Context, bucket string, prefix string, keyMarker string, uploadIDMarker string, delimiter string, maxUploads int) (lmi minio.ListMultipartsInfo, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return lmi, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	// It's decided not to support List Multipart Uploads, hence returning empty result.
 | |
| 	return lmi, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) checkUploadIDExists(ctx context.Context, bucket, object, uploadID string) (err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(minioMetaTmpBucket, uploadID))
 | |
| 	if err != nil {
 | |
| 		return hdfsToObjectErr(ctx, err, bucket, object, uploadID)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // GetMultipartInfo returns multipart info of the uploadId of the object
 | |
| func (n *hdfsObjects) GetMultipartInfo(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (result minio.MultipartInfo, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return result, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
 | |
| 		return result, err
 | |
| 	}
 | |
| 
 | |
| 	result.Bucket = bucket
 | |
| 	result.Object = object
 | |
| 	result.UploadID = uploadID
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) ListObjectParts(ctx context.Context, bucket, object, uploadID string, partNumberMarker int, maxParts int, opts minio.ObjectOptions) (result minio.ListPartsInfo, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return result, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
 | |
| 		return result, err
 | |
| 	}
 | |
| 
 | |
| 	// It's decided not to support List parts, hence returning empty result.
 | |
| 	return result, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) CopyObjectPart(ctx context.Context, srcBucket, srcObject, dstBucket, dstObject, uploadID string, partID int,
 | |
| 	startOffset int64, length int64, srcInfo minio.ObjectInfo, srcOpts, dstOpts minio.ObjectOptions) (minio.PartInfo, error) {
 | |
| 	return n.PutObjectPart(ctx, dstBucket, dstObject, uploadID, partID, srcInfo.PutObjReader, dstOpts)
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) PutObjectPart(ctx context.Context, bucket, object, uploadID string, partID int, r *minio.PutObjReader, opts minio.ObjectOptions) (info minio.PartInfo, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return info, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	var w *hdfs.FileWriter
 | |
| 	w, err = n.clnt.Append(n.hdfsPathJoin(minioMetaTmpBucket, uploadID))
 | |
| 	if err != nil {
 | |
| 		return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID)
 | |
| 	}
 | |
| 	defer w.Close()
 | |
| 	_, err = io.Copy(w, r.Reader)
 | |
| 	if err != nil {
 | |
| 		return info, hdfsToObjectErr(ctx, err, bucket, object, uploadID)
 | |
| 	}
 | |
| 
 | |
| 	info.PartNumber = partID
 | |
| 	info.ETag = r.MD5CurrentHexString()
 | |
| 	info.LastModified = minio.UTCNow()
 | |
| 	info.Size = r.Reader.Size()
 | |
| 
 | |
| 	return info, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object, uploadID string, parts []minio.CompletePart, opts minio.ObjectOptions) (objInfo minio.ObjectInfo, err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return objInfo, hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 
 | |
| 	if err = n.checkUploadIDExists(ctx, bucket, object, uploadID); err != nil {
 | |
| 		return objInfo, err
 | |
| 	}
 | |
| 
 | |
| 	name := n.hdfsPathJoin(bucket, object)
 | |
| 	dir := path.Dir(name)
 | |
| 	if dir != "" {
 | |
| 		if err = n.clnt.MkdirAll(dir, os.FileMode(0755)); err != nil {
 | |
| 			return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name)
 | |
| 	// Object already exists is an error on HDFS
 | |
| 	// remove it and then create it again.
 | |
| 	if os.IsExist(err) {
 | |
| 		if err = n.clnt.Remove(name); err != nil {
 | |
| 			if dir != "" {
 | |
| 				n.deleteObject(n.hdfsPathJoin(bucket), dir)
 | |
| 			}
 | |
| 			return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 		}
 | |
| 		if err = n.clnt.Rename(n.hdfsPathJoin(minioMetaTmpBucket, uploadID), name); err != nil {
 | |
| 			if dir != "" {
 | |
| 				n.deleteObject(n.hdfsPathJoin(bucket), dir)
 | |
| 			}
 | |
| 			return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 		}
 | |
| 	}
 | |
| 	fi, err := n.clnt.Stat(name)
 | |
| 	if err != nil {
 | |
| 		return objInfo, hdfsToObjectErr(ctx, err, bucket, object)
 | |
| 	}
 | |
| 
 | |
| 	// Calculate s3 compatible md5sum for complete multipart.
 | |
| 	s3MD5 := minio.ComputeCompleteMultipartMD5(parts)
 | |
| 
 | |
| 	return minio.ObjectInfo{
 | |
| 		Bucket:  bucket,
 | |
| 		Name:    object,
 | |
| 		ETag:    s3MD5,
 | |
| 		ModTime: fi.ModTime(),
 | |
| 		Size:    fi.Size(),
 | |
| 		IsDir:   fi.IsDir(),
 | |
| 		AccTime: fi.(*hdfs.FileInfo).AccessTime(),
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (n *hdfsObjects) AbortMultipartUpload(ctx context.Context, bucket, object, uploadID string, opts minio.ObjectOptions) (err error) {
 | |
| 	_, err = n.clnt.Stat(n.hdfsPathJoin(bucket))
 | |
| 	if err != nil {
 | |
| 		return hdfsToObjectErr(ctx, err, bucket)
 | |
| 	}
 | |
| 	return hdfsToObjectErr(ctx, n.clnt.Remove(n.hdfsPathJoin(minioMetaTmpBucket, uploadID)), bucket, object, uploadID)
 | |
| }
 |