mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 10:11:09 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			1369 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			1369 lines
		
	
	
		
			43 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright (c) 2015-2021 MinIO, Inc.
 | 
						|
//
 | 
						|
// This file is part of MinIO Object Storage stack
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package cmd
 | 
						|
 | 
						|
import (
 | 
						|
	"bufio"
 | 
						|
	"context"
 | 
						|
	"encoding/binary"
 | 
						|
	"encoding/gob"
 | 
						|
	"encoding/hex"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net/http"
 | 
						|
	"os/user"
 | 
						|
	"path"
 | 
						|
	"runtime"
 | 
						|
	"runtime/debug"
 | 
						|
	"strconv"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/minio/minio/internal/grid"
 | 
						|
	"github.com/tinylib/msgp/msgp"
 | 
						|
 | 
						|
	jwtreq "github.com/golang-jwt/jwt/v4/request"
 | 
						|
	"github.com/minio/madmin-go/v3"
 | 
						|
	"github.com/minio/minio/internal/config"
 | 
						|
	xhttp "github.com/minio/minio/internal/http"
 | 
						|
	xioutil "github.com/minio/minio/internal/ioutil"
 | 
						|
	xjwt "github.com/minio/minio/internal/jwt"
 | 
						|
	"github.com/minio/minio/internal/logger"
 | 
						|
	"github.com/minio/mux"
 | 
						|
	xnet "github.com/minio/pkg/v2/net"
 | 
						|
)
 | 
						|
 | 
						|
var errDiskStale = errors.New("drive stale")
 | 
						|
 | 
						|
// To abstract a disk over network.
 | 
						|
type storageRESTServer struct {
 | 
						|
	endpoint Endpoint
 | 
						|
}
 | 
						|
 | 
						|
var (
 | 
						|
	storageCheckPartsRPC     = grid.NewSingleHandler[*CheckPartsHandlerParams, grid.NoPayload](grid.HandlerCheckParts, func() *CheckPartsHandlerParams { return &CheckPartsHandlerParams{} }, grid.NewNoPayload)
 | 
						|
	storageDeleteFileRPC     = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams { return &DeleteFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
 | 
						|
	storageDeleteVersionRPC  = grid.NewSingleHandler[*DeleteVersionHandlerParams, grid.NoPayload](grid.HandlerDeleteVersion, func() *DeleteVersionHandlerParams { return &DeleteVersionHandlerParams{} }, grid.NewNoPayload)
 | 
						|
	storageDiskInfoRPC       = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} }, func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true)
 | 
						|
	storageNSScannerRPC      = grid.NewStream[*nsScannerOptions, grid.NoPayload, *nsScannerResp](grid.HandlerNSScanner, func() *nsScannerOptions { return &nsScannerOptions{} }, nil, func() *nsScannerResp { return &nsScannerResp{} })
 | 
						|
	storageReadAllRPC        = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams { return &ReadAllHandlerParams{} }, grid.NewBytes).AllowCallRequestPool(true)
 | 
						|
	storageWriteAllRPC       = grid.NewSingleHandler[*WriteAllHandlerParams, grid.NoPayload](grid.HandlerWriteAll, func() *WriteAllHandlerParams { return &WriteAllHandlerParams{} }, grid.NewNoPayload)
 | 
						|
	storageReadVersionRPC    = grid.NewSingleHandler[*grid.MSS, *FileInfo](grid.HandlerReadVersion, grid.NewMSS, func() *FileInfo { return &FileInfo{} })
 | 
						|
	storageReadXLRPC         = grid.NewSingleHandler[*grid.MSS, *RawFileInfo](grid.HandlerReadXL, grid.NewMSS, func() *RawFileInfo { return &RawFileInfo{} })
 | 
						|
	storageRenameDataRPC     = grid.NewSingleHandler[*RenameDataHandlerParams, *RenameDataResp](grid.HandlerRenameData, func() *RenameDataHandlerParams { return &RenameDataHandlerParams{} }, func() *RenameDataResp { return &RenameDataResp{} })
 | 
						|
	storageRenameFileRPC     = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} }, grid.NewNoPayload).AllowCallRequestPool(true)
 | 
						|
	storageStatVolRPC        = grid.NewSingleHandler[*grid.MSS, *VolInfo](grid.HandlerStatVol, grid.NewMSS, func() *VolInfo { return &VolInfo{} })
 | 
						|
	storageUpdateMetadataRPC = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerUpdateMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload)
 | 
						|
	storageWriteMetadataRPC  = grid.NewSingleHandler[*MetadataHandlerParams, grid.NoPayload](grid.HandlerWriteMetadata, func() *MetadataHandlerParams { return &MetadataHandlerParams{} }, grid.NewNoPayload)
 | 
						|
	storageListDirRPC        = grid.NewStream[*grid.MSS, grid.NoPayload, *ListDirResult](grid.HandlerListDir, grid.NewMSS, nil, func() *ListDirResult { return &ListDirResult{} }).WithOutCapacity(1)
 | 
						|
)
 | 
						|
 | 
						|
func getStorageViaEndpoint(endpoint Endpoint) StorageAPI {
 | 
						|
	globalLocalDrivesMu.RLock()
 | 
						|
	defer globalLocalDrivesMu.RUnlock()
 | 
						|
	if len(globalLocalSetDrives) == 0 {
 | 
						|
		for _, drive := range globalLocalDrives {
 | 
						|
			if drive != nil && drive.Endpoint().Equal(endpoint) {
 | 
						|
				return drive
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx]
 | 
						|
}
 | 
						|
 | 
						|
func (s *storageRESTServer) getStorage() StorageAPI {
 | 
						|
	return getStorageViaEndpoint(s.endpoint)
 | 
						|
}
 | 
						|
 | 
						|
func (s *storageRESTServer) writeErrorResponse(w http.ResponseWriter, err error) {
 | 
						|
	err = unwrapAll(err)
 | 
						|
	switch err {
 | 
						|
	case errDiskStale:
 | 
						|
		w.WriteHeader(http.StatusPreconditionFailed)
 | 
						|
	case errFileNotFound, errFileVersionNotFound:
 | 
						|
		w.WriteHeader(http.StatusNotFound)
 | 
						|
	case errInvalidAccessKeyID, errAccessKeyDisabled, errNoAuthToken, errMalformedAuth, errAuthentication, errSkewedAuthTime:
 | 
						|
		w.WriteHeader(http.StatusUnauthorized)
 | 
						|
	case context.Canceled, context.DeadlineExceeded:
 | 
						|
		w.WriteHeader(499)
 | 
						|
	default:
 | 
						|
		w.WriteHeader(http.StatusForbidden)
 | 
						|
	}
 | 
						|
	w.Write([]byte(err.Error()))
 | 
						|
}
 | 
						|
 | 
						|
// DefaultSkewTime - skew time is 15 minutes between minio peers.
 | 
						|
const DefaultSkewTime = 15 * time.Minute
 | 
						|
 | 
						|
// Authenticates storage client's requests and validates for skewed time.
 | 
						|
func storageServerRequestValidate(r *http.Request) error {
 | 
						|
	token, err := jwtreq.AuthorizationHeaderExtractor.ExtractToken(r)
 | 
						|
	if err != nil {
 | 
						|
		if err == jwtreq.ErrNoTokenInRequest {
 | 
						|
			return errNoAuthToken
 | 
						|
		}
 | 
						|
		return errMalformedAuth
 | 
						|
	}
 | 
						|
 | 
						|
	claims := xjwt.NewStandardClaims()
 | 
						|
	if err = xjwt.ParseWithStandardClaims(token, claims, []byte(globalActiveCred.SecretKey)); err != nil {
 | 
						|
		return errAuthentication
 | 
						|
	}
 | 
						|
 | 
						|
	owner := claims.AccessKey == globalActiveCred.AccessKey || claims.Subject == globalActiveCred.AccessKey
 | 
						|
	if !owner {
 | 
						|
		return errAuthentication
 | 
						|
	}
 | 
						|
 | 
						|
	if claims.Audience != r.URL.RawQuery {
 | 
						|
		return errAuthentication
 | 
						|
	}
 | 
						|
 | 
						|
	requestTimeStr := r.Header.Get("X-Minio-Time")
 | 
						|
	requestTime, err := time.Parse(time.RFC3339, requestTimeStr)
 | 
						|
	if err != nil {
 | 
						|
		return errMalformedAuth
 | 
						|
	}
 | 
						|
	utcNow := UTCNow()
 | 
						|
	delta := requestTime.Sub(utcNow)
 | 
						|
	if delta < 0 {
 | 
						|
		delta *= -1
 | 
						|
	}
 | 
						|
	if delta > DefaultSkewTime {
 | 
						|
		return errSkewedAuthTime
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// IsAuthValid - To authenticate and verify the time difference.
 | 
						|
func (s *storageRESTServer) IsAuthValid(w http.ResponseWriter, r *http.Request) bool {
 | 
						|
	if s.getStorage() == nil {
 | 
						|
		s.writeErrorResponse(w, errDiskNotFound)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if err := storageServerRequestValidate(r); err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// IsValid - To authenticate and check if the disk-id in the request corresponds to the underlying disk.
 | 
						|
func (s *storageRESTServer) IsValid(w http.ResponseWriter, r *http.Request) bool {
 | 
						|
	if !s.IsAuthValid(w, r) {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if err := r.ParseForm(); err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	diskID := r.Form.Get(storageRESTDiskID)
 | 
						|
	if diskID == "" {
 | 
						|
		// Request sent empty disk-id, we allow the request
 | 
						|
		// as the peer might be coming up and trying to read format.json
 | 
						|
		// or create format.json
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	storedDiskID, err := s.getStorage().GetDiskID()
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	if diskID != storedDiskID {
 | 
						|
		s.writeErrorResponse(w, errDiskStale)
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	// If format.json is available and request sent the right disk-id, we allow the request
 | 
						|
	return true
 | 
						|
}
 | 
						|
 | 
						|
// checkID - check if the disk-id in the request corresponds to the underlying disk.
 | 
						|
func (s *storageRESTServer) checkID(wantID string) bool {
 | 
						|
	if s.getStorage() == nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
	if wantID == "" {
 | 
						|
		// Request sent empty disk-id, we allow the request
 | 
						|
		// as the peer might be coming up and trying to read format.json
 | 
						|
		// or create format.json
 | 
						|
		return true
 | 
						|
	}
 | 
						|
 | 
						|
	storedDiskID, err := s.getStorage().GetDiskID()
 | 
						|
	if err != nil {
 | 
						|
		return false
 | 
						|
	}
 | 
						|
 | 
						|
	return wantID == storedDiskID
 | 
						|
}
 | 
						|
 | 
						|
// HealthHandler handler checks if disk is stale
 | 
						|
func (s *storageRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	s.IsValid(w, r)
 | 
						|
}
 | 
						|
 | 
						|
// DiskInfoHandler - returns disk info.
 | 
						|
func (s *storageRESTServer) DiskInfoHandler(opts *DiskInfoOptions) (*DiskInfo, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(opts.DiskID) {
 | 
						|
		return nil, grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	info, err := s.getStorage().DiskInfo(context.Background(), *opts)
 | 
						|
	if err != nil {
 | 
						|
		info.Error = err.Error()
 | 
						|
	}
 | 
						|
	return &info, nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *storageRESTServer) NSScannerHandler(ctx context.Context, params *nsScannerOptions, out chan<- *nsScannerResp) *grid.RemoteErr {
 | 
						|
	if !s.checkID(params.DiskID) {
 | 
						|
		return grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	if params.Cache == nil {
 | 
						|
		return grid.NewRemoteErrString("NSScannerHandler: provided cache is nil")
 | 
						|
	}
 | 
						|
 | 
						|
	// Collect updates, stream them before the full cache is sent.
 | 
						|
	updates := make(chan dataUsageEntry, 1)
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	wg.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer wg.Done()
 | 
						|
		for update := range updates {
 | 
						|
			resp := storageNSScannerRPC.NewResponse()
 | 
						|
			resp.Update = &update
 | 
						|
			out <- resp
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	ui, err := s.getStorage().NSScanner(ctx, *params.Cache, updates, madmin.HealScanMode(params.ScanMode), nil)
 | 
						|
	wg.Wait()
 | 
						|
	if err != nil {
 | 
						|
		return grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
	// Send final response.
 | 
						|
	resp := storageNSScannerRPC.NewResponse()
 | 
						|
	resp.Final = &ui
 | 
						|
	out <- resp
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// MakeVolHandler - make a volume.
 | 
						|
func (s *storageRESTServer) MakeVolHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	err := s.getStorage().MakeVol(r.Context(), volume)
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// MakeVolBulkHandler - create multiple volumes as a bulk operation.
 | 
						|
func (s *storageRESTServer) MakeVolBulkHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volumes := strings.Split(r.Form.Get(storageRESTVolumes), ",")
 | 
						|
	err := s.getStorage().MakeVolBulk(r.Context(), volumes...)
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// StatVolHandler - stat a volume.
 | 
						|
func (s *storageRESTServer) StatVolHandler(params *grid.MSS) (*VolInfo, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(params.Get(storageRESTDiskID)) {
 | 
						|
		return nil, grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	info, err := s.getStorage().StatVol(context.Background(), params.Get(storageRESTVolume))
 | 
						|
	if err != nil {
 | 
						|
		return nil, grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
	return &info, nil
 | 
						|
}
 | 
						|
 | 
						|
// AppendFileHandler - append data from the request to the file specified.
 | 
						|
func (s *storageRESTServer) AppendFileHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
 | 
						|
	buf := make([]byte, r.ContentLength)
 | 
						|
	_, err := io.ReadFull(r.Body, buf)
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	err = s.getStorage().AppendFile(r.Context(), volume, filePath, buf)
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// CreateFileHandler - copy the contents from the request.
 | 
						|
func (s *storageRESTServer) CreateFileHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
	origvolume := r.Form.Get(storageRESTOrigVolume)
 | 
						|
 | 
						|
	fileSizeStr := r.Form.Get(storageRESTLength)
 | 
						|
	fileSize, err := strconv.Atoi(fileSizeStr)
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	done, body := keepHTTPReqResponseAlive(w, r)
 | 
						|
	done(s.getStorage().CreateFile(r.Context(), origvolume, volume, filePath, int64(fileSize), body))
 | 
						|
}
 | 
						|
 | 
						|
// DeleteVersionHandler delete updated metadata.
 | 
						|
func (s *storageRESTServer) DeleteVersionHandler(p *DeleteVersionHandlerParams) (np grid.NoPayload, gerr *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return np, grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	volume := p.Volume
 | 
						|
	filePath := p.FilePath
 | 
						|
	forceDelMarker := p.ForceDelMarker
 | 
						|
 | 
						|
	opts := DeleteOptions{}
 | 
						|
	err := s.getStorage().DeleteVersion(context.Background(), volume, filePath, p.FI, forceDelMarker, opts)
 | 
						|
	return np, grid.NewRemoteErr(err)
 | 
						|
}
 | 
						|
 | 
						|
// ReadVersionHandlerWS read metadata of versionID
 | 
						|
func (s *storageRESTServer) ReadVersionHandlerWS(params *grid.MSS) (*FileInfo, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(params.Get(storageRESTDiskID)) {
 | 
						|
		return nil, grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	origvolume := params.Get(storageRESTOrigVolume)
 | 
						|
	volume := params.Get(storageRESTVolume)
 | 
						|
	filePath := params.Get(storageRESTFilePath)
 | 
						|
	versionID := params.Get(storageRESTVersionID)
 | 
						|
	readData, err := strconv.ParseBool(params.Get(storageRESTReadData))
 | 
						|
	if err != nil {
 | 
						|
		return nil, grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
 | 
						|
	healing, err := strconv.ParseBool(params.Get(storageRESTHealing))
 | 
						|
	if err != nil {
 | 
						|
		return nil, grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
 | 
						|
	fi, err := s.getStorage().ReadVersion(context.Background(), origvolume, volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
 | 
						|
	if err != nil {
 | 
						|
		return nil, grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
	return &fi, nil
 | 
						|
}
 | 
						|
 | 
						|
// ReadVersionHandler read metadata of versionID
 | 
						|
func (s *storageRESTServer) ReadVersionHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	origvolume := r.Form.Get(storageRESTOrigVolume)
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
	versionID := r.Form.Get(storageRESTVersionID)
 | 
						|
	readData, err := strconv.ParseBool(r.Form.Get(storageRESTReadData))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	healing, err := strconv.ParseBool(r.Form.Get(storageRESTHealing))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	fi, err := s.getStorage().ReadVersion(r.Context(), origvolume, volume, filePath, versionID, ReadOptions{ReadData: readData, Healing: healing})
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	logger.LogIf(r.Context(), msgp.Encode(w, &fi))
 | 
						|
}
 | 
						|
 | 
						|
// WriteMetadataHandler rpc handler to write new updated metadata.
 | 
						|
func (s *storageRESTServer) WriteMetadataHandler(p *MetadataHandlerParams) (np grid.NoPayload, gerr *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return grid.NewNPErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
 | 
						|
	volume := p.Volume
 | 
						|
	filePath := p.FilePath
 | 
						|
	origvolume := p.OrigVolume
 | 
						|
 | 
						|
	err := s.getStorage().WriteMetadata(context.Background(), origvolume, volume, filePath, p.FI)
 | 
						|
	return np, grid.NewRemoteErr(err)
 | 
						|
}
 | 
						|
 | 
						|
// UpdateMetadataHandler update new updated metadata.
 | 
						|
func (s *storageRESTServer) UpdateMetadataHandler(p *MetadataHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return grid.NewNPErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	volume := p.Volume
 | 
						|
	filePath := p.FilePath
 | 
						|
 | 
						|
	return grid.NewNPErr(s.getStorage().UpdateMetadata(context.Background(), volume, filePath, p.FI, p.UpdateOpts))
 | 
						|
}
 | 
						|
 | 
						|
// CheckPartsHandler - check if a file metadata exists.
 | 
						|
func (s *storageRESTServer) CheckPartsHandler(p *CheckPartsHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return grid.NewNPErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	volume := p.Volume
 | 
						|
	filePath := p.FilePath
 | 
						|
	return grid.NewNPErr(s.getStorage().CheckParts(context.Background(), volume, filePath, p.FI))
 | 
						|
}
 | 
						|
 | 
						|
func (s *storageRESTServer) WriteAllHandler(p *WriteAllHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return grid.NewNPErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
 | 
						|
	volume := p.Volume
 | 
						|
	filePath := p.FilePath
 | 
						|
 | 
						|
	return grid.NewNPErr(s.getStorage().WriteAll(context.Background(), volume, filePath, p.Buf))
 | 
						|
}
 | 
						|
 | 
						|
// ReadAllHandler - read all the contents of a file.
 | 
						|
func (s *storageRESTServer) ReadAllHandler(p *ReadAllHandlerParams) (*grid.Bytes, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return nil, grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
 | 
						|
	volume := p.Volume
 | 
						|
	filePath := p.FilePath
 | 
						|
 | 
						|
	buf, err := s.getStorage().ReadAll(context.Background(), volume, filePath)
 | 
						|
	return grid.NewBytesWith(buf), grid.NewRemoteErr(err)
 | 
						|
}
 | 
						|
 | 
						|
// ReadXLHandler - read xl.meta for an object at path.
 | 
						|
func (s *storageRESTServer) ReadXLHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
	readData, err := strconv.ParseBool(r.Form.Get(storageRESTReadData))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	rf, err := s.getStorage().ReadXL(r.Context(), volume, filePath, readData)
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	logger.LogIf(r.Context(), msgp.Encode(w, &rf))
 | 
						|
}
 | 
						|
 | 
						|
// ReadXLHandlerWS - read xl.meta for an object at path.
 | 
						|
func (s *storageRESTServer) ReadXLHandlerWS(params *grid.MSS) (*RawFileInfo, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(params.Get(storageRESTDiskID)) {
 | 
						|
		return nil, grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	volume := params.Get(storageRESTVolume)
 | 
						|
	filePath := params.Get(storageRESTFilePath)
 | 
						|
	readData, err := strconv.ParseBool(params.Get(storageRESTReadData))
 | 
						|
	if err != nil {
 | 
						|
		return nil, grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
 | 
						|
	rf, err := s.getStorage().ReadXL(context.Background(), volume, filePath, readData)
 | 
						|
	if err != nil {
 | 
						|
		return nil, grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
 | 
						|
	return &rf, nil
 | 
						|
}
 | 
						|
 | 
						|
// ReadFileHandler - read section of a file.
 | 
						|
func (s *storageRESTServer) ReadFileHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
	offset, err := strconv.Atoi(r.Form.Get(storageRESTOffset))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	length, err := strconv.Atoi(r.Form.Get(storageRESTLength))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if offset < 0 || length < 0 {
 | 
						|
		s.writeErrorResponse(w, errInvalidArgument)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	var verifier *BitrotVerifier
 | 
						|
	if r.Form.Get(storageRESTBitrotAlgo) != "" {
 | 
						|
		hashStr := r.Form.Get(storageRESTBitrotHash)
 | 
						|
		var hash []byte
 | 
						|
		hash, err = hex.DecodeString(hashStr)
 | 
						|
		if err != nil {
 | 
						|
			s.writeErrorResponse(w, err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		verifier = NewBitrotVerifier(BitrotAlgorithmFromString(r.Form.Get(storageRESTBitrotAlgo)), hash)
 | 
						|
	}
 | 
						|
	buf := make([]byte, length)
 | 
						|
	defer metaDataPoolPut(buf) // Reuse if we can.
 | 
						|
	_, err = s.getStorage().ReadFile(r.Context(), volume, filePath, int64(offset), buf, verifier)
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	w.Header().Set(xhttp.ContentLength, strconv.Itoa(len(buf)))
 | 
						|
	w.Write(buf)
 | 
						|
}
 | 
						|
 | 
						|
// ReadFileStreamHandler - read section of a file.
 | 
						|
func (s *storageRESTServer) ReadFileStreamHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
	offset, err := strconv.Atoi(r.Form.Get(storageRESTOffset))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	length, err := strconv.Atoi(r.Form.Get(storageRESTLength))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	w.Header().Set(xhttp.ContentLength, strconv.Itoa(length))
 | 
						|
 | 
						|
	rc, err := s.getStorage().ReadFileStream(r.Context(), volume, filePath, int64(offset), int64(length))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	defer rc.Close()
 | 
						|
 | 
						|
	rf, ok := w.(io.ReaderFrom)
 | 
						|
	if ok && runtime.GOOS != "windows" {
 | 
						|
		// Attempt to use splice/sendfile() optimization, A very specific behavior mentioned below is necessary.
 | 
						|
		// See https://github.com/golang/go/blob/f7c5cbb82087c55aa82081e931e0142783700ce8/src/net/sendfile_linux.go#L20
 | 
						|
		// Windows can lock up with this optimization, so we fall back to regular copy.
 | 
						|
		sr, ok := rc.(*sendFileReader)
 | 
						|
		if ok {
 | 
						|
			_, err = rf.ReadFrom(sr.Reader)
 | 
						|
			if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
 | 
						|
				logger.LogIf(r.Context(), err)
 | 
						|
			}
 | 
						|
			if err == nil || !errors.Is(err, xhttp.ErrNotImplemented) {
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	} // Fallback to regular copy
 | 
						|
 | 
						|
	_, err = xioutil.Copy(w, rc)
 | 
						|
	if !xnet.IsNetworkOrHostDown(err, true) { // do not need to log disconnected clients
 | 
						|
		logger.LogIf(r.Context(), err)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ListDirHandler - list a directory.
 | 
						|
func (s *storageRESTServer) ListDirHandler(ctx context.Context, params *grid.MSS, out chan<- *ListDirResult) *grid.RemoteErr {
 | 
						|
	if !s.checkID(params.Get(storageRESTDiskID)) {
 | 
						|
		return grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	volume := params.Get(storageRESTVolume)
 | 
						|
	dirPath := params.Get(storageRESTDirPath)
 | 
						|
	origvolume := params.Get(storageRESTOrigVolume)
 | 
						|
	count, err := strconv.Atoi(params.Get(storageRESTCount))
 | 
						|
	if err != nil {
 | 
						|
		return grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
 | 
						|
	entries, err := s.getStorage().ListDir(ctx, origvolume, volume, dirPath, count)
 | 
						|
	if err != nil {
 | 
						|
		return grid.NewRemoteErr(err)
 | 
						|
	}
 | 
						|
	out <- &ListDirResult{Entries: entries}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// DeleteFileHandler - delete a file.
 | 
						|
func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return grid.NewNPErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	return grid.NewNPErr(s.getStorage().Delete(context.Background(), p.Volume, p.FilePath, p.Opts))
 | 
						|
}
 | 
						|
 | 
						|
// DeleteVersionsErrsResp - collection of delete errors
 | 
						|
// for bulk version deletes
 | 
						|
type DeleteVersionsErrsResp struct {
 | 
						|
	Errs []error
 | 
						|
}
 | 
						|
 | 
						|
// DeleteVersionsHandler - delete a set of a versions.
 | 
						|
func (s *storageRESTServer) DeleteVersionsHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	totalVersions, err := strconv.Atoi(r.Form.Get(storageRESTTotalVersions))
 | 
						|
	if err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	versions := make([]FileInfoVersions, totalVersions)
 | 
						|
	decoder := msgpNewReader(r.Body)
 | 
						|
	defer readMsgpReaderPoolPut(decoder)
 | 
						|
	for i := 0; i < totalVersions; i++ {
 | 
						|
		dst := &versions[i]
 | 
						|
		if err := dst.DecodeMsg(decoder); err != nil {
 | 
						|
			s.writeErrorResponse(w, err)
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	dErrsResp := &DeleteVersionsErrsResp{Errs: make([]error, totalVersions)}
 | 
						|
 | 
						|
	setEventStreamHeaders(w)
 | 
						|
	encoder := gob.NewEncoder(w)
 | 
						|
	done := keepHTTPResponseAlive(w)
 | 
						|
 | 
						|
	opts := DeleteOptions{}
 | 
						|
	errs := s.getStorage().DeleteVersions(r.Context(), volume, versions, opts)
 | 
						|
	done(nil)
 | 
						|
	for idx := range versions {
 | 
						|
		if errs[idx] != nil {
 | 
						|
			dErrsResp.Errs[idx] = StorageErr(errs[idx].Error())
 | 
						|
		}
 | 
						|
	}
 | 
						|
	encoder.Encode(dErrsResp)
 | 
						|
}
 | 
						|
 | 
						|
// RenameDataHandler - renames a meta object and data dir to destination.
 | 
						|
func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*RenameDataResp, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return nil, grid.NewRemoteErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
 | 
						|
	sign, err := s.getStorage().RenameData(context.Background(), p.SrcVolume, p.SrcPath, p.FI, p.DstVolume, p.DstPath, p.Opts)
 | 
						|
	resp := &RenameDataResp{
 | 
						|
		Signature: sign,
 | 
						|
	}
 | 
						|
	return resp, grid.NewRemoteErr(err)
 | 
						|
}
 | 
						|
 | 
						|
// RenameFileHandler - rename a file from source to destination
 | 
						|
func (s *storageRESTServer) RenameFileHandler(p *RenameFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) {
 | 
						|
	if !s.checkID(p.DiskID) {
 | 
						|
		return grid.NewNPErr(errDiskNotFound)
 | 
						|
	}
 | 
						|
	return grid.NewNPErr(s.getStorage().RenameFile(context.Background(), p.SrcVolume, p.SrcFilePath, p.DstVolume, p.DstFilePath))
 | 
						|
}
 | 
						|
 | 
						|
// CleanAbandonedDataHandler - Clean unused data directories.
 | 
						|
func (s *storageRESTServer) CleanAbandonedDataHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
	if volume == "" || filePath == "" {
 | 
						|
		return // Ignore
 | 
						|
	}
 | 
						|
	keepHTTPResponseAlive(w)(s.getStorage().CleanAbandonedData(r.Context(), volume, filePath))
 | 
						|
}
 | 
						|
 | 
						|
// closeNotifier is itself a ReadCloser that will notify when either an error occurs or
 | 
						|
// the Close() function is called.
 | 
						|
type closeNotifier struct {
 | 
						|
	rc   io.ReadCloser
 | 
						|
	done chan struct{}
 | 
						|
}
 | 
						|
 | 
						|
func (c *closeNotifier) Read(p []byte) (n int, err error) {
 | 
						|
	n, err = c.rc.Read(p)
 | 
						|
	if err != nil {
 | 
						|
		if c.done != nil {
 | 
						|
			xioutil.SafeClose(c.done)
 | 
						|
			c.done = nil
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return n, err
 | 
						|
}
 | 
						|
 | 
						|
func (c *closeNotifier) Close() error {
 | 
						|
	if c.done != nil {
 | 
						|
		xioutil.SafeClose(c.done)
 | 
						|
		c.done = nil
 | 
						|
	}
 | 
						|
	return c.rc.Close()
 | 
						|
}
 | 
						|
 | 
						|
// keepHTTPReqResponseAlive can be used to avoid timeouts with long storage
 | 
						|
// operations, such as bitrot verification or data usage scanning.
 | 
						|
// Every 10 seconds a space character is sent.
 | 
						|
// keepHTTPReqResponseAlive will wait for the returned body to be read before starting the ticker.
 | 
						|
// The returned function should always be called to release resources.
 | 
						|
// An optional error can be sent which will be picked as text only error,
 | 
						|
// without its original type by the receiver.
 | 
						|
// waitForHTTPResponse should be used to the receiving side.
 | 
						|
func keepHTTPReqResponseAlive(w http.ResponseWriter, r *http.Request) (resp func(error), body io.ReadCloser) {
 | 
						|
	bodyDoneCh := make(chan struct{})
 | 
						|
	doneCh := make(chan error)
 | 
						|
	ctx := r.Context()
 | 
						|
	go func() {
 | 
						|
		canWrite := true
 | 
						|
		write := func(b []byte) {
 | 
						|
			if canWrite {
 | 
						|
				n, err := w.Write(b)
 | 
						|
				if err != nil || n != len(b) {
 | 
						|
					canWrite = false
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
		// Wait for body to be read.
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
		case <-bodyDoneCh:
 | 
						|
		case err := <-doneCh:
 | 
						|
			if err != nil {
 | 
						|
				write([]byte{1})
 | 
						|
				write([]byte(err.Error()))
 | 
						|
			} else {
 | 
						|
				write([]byte{0})
 | 
						|
			}
 | 
						|
			xioutil.SafeClose(doneCh)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		defer xioutil.SafeClose(doneCh)
 | 
						|
		// Initiate ticker after body has been read.
 | 
						|
		ticker := time.NewTicker(time.Second * 10)
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ticker.C:
 | 
						|
				// Response not ready, write a filler byte.
 | 
						|
				write([]byte{32})
 | 
						|
				if canWrite {
 | 
						|
					w.(http.Flusher).Flush()
 | 
						|
				}
 | 
						|
			case err := <-doneCh:
 | 
						|
				if err != nil {
 | 
						|
					write([]byte{1})
 | 
						|
					write([]byte(err.Error()))
 | 
						|
				} else {
 | 
						|
					write([]byte{0})
 | 
						|
				}
 | 
						|
				ticker.Stop()
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return func(err error) {
 | 
						|
		if doneCh == nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
 | 
						|
		// Indicate we are ready to write.
 | 
						|
		doneCh <- err
 | 
						|
 | 
						|
		// Wait for channel to be closed so we don't race on writes.
 | 
						|
		<-doneCh
 | 
						|
 | 
						|
		// Clear so we can be called multiple times without crashing.
 | 
						|
		doneCh = nil
 | 
						|
	}, &closeNotifier{rc: r.Body, done: bodyDoneCh}
 | 
						|
}
 | 
						|
 | 
						|
// keepHTTPResponseAlive can be used to avoid timeouts with long storage
 | 
						|
// operations, such as bitrot verification or data usage scanning.
 | 
						|
// keepHTTPResponseAlive may NOT be used until the request body has been read,
 | 
						|
// use keepHTTPReqResponseAlive instead.
 | 
						|
// Every 10 seconds a space character is sent.
 | 
						|
// The returned function should always be called to release resources.
 | 
						|
// An optional error can be sent which will be picked as text only error,
 | 
						|
// without its original type by the receiver.
 | 
						|
// waitForHTTPResponse should be used to the receiving side.
 | 
						|
func keepHTTPResponseAlive(w http.ResponseWriter) func(error) {
 | 
						|
	doneCh := make(chan error)
 | 
						|
	go func() {
 | 
						|
		canWrite := true
 | 
						|
		write := func(b []byte) {
 | 
						|
			if canWrite {
 | 
						|
				n, err := w.Write(b)
 | 
						|
				if err != nil || n != len(b) {
 | 
						|
					canWrite = false
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
		defer xioutil.SafeClose(doneCh)
 | 
						|
		ticker := time.NewTicker(time.Second * 10)
 | 
						|
		defer ticker.Stop()
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ticker.C:
 | 
						|
				// Response not ready, write a filler byte.
 | 
						|
				write([]byte{32})
 | 
						|
				if canWrite {
 | 
						|
					w.(http.Flusher).Flush()
 | 
						|
				}
 | 
						|
			case err := <-doneCh:
 | 
						|
				if err != nil {
 | 
						|
					write([]byte{1})
 | 
						|
					write([]byte(err.Error()))
 | 
						|
				} else {
 | 
						|
					write([]byte{0})
 | 
						|
				}
 | 
						|
				return
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return func(err error) {
 | 
						|
		if doneCh == nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		// Indicate we are ready to write.
 | 
						|
		doneCh <- err
 | 
						|
 | 
						|
		// Wait for channel to be closed so we don't race on writes.
 | 
						|
		<-doneCh
 | 
						|
 | 
						|
		// Clear so we can be called multiple times without crashing.
 | 
						|
		doneCh = nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// waitForHTTPResponse will wait for responses where keepHTTPResponseAlive
 | 
						|
// has been used.
 | 
						|
// The returned reader contains the payload.
 | 
						|
func waitForHTTPResponse(respBody io.Reader) (io.Reader, error) {
 | 
						|
	reader := bufio.NewReader(respBody)
 | 
						|
	for {
 | 
						|
		b, err := reader.ReadByte()
 | 
						|
		if err != nil {
 | 
						|
			return nil, err
 | 
						|
		}
 | 
						|
		// Check if we have a response ready or a filler byte.
 | 
						|
		switch b {
 | 
						|
		case 0:
 | 
						|
			return reader, nil
 | 
						|
		case 1:
 | 
						|
			errorText, err := io.ReadAll(reader)
 | 
						|
			if err != nil {
 | 
						|
				return nil, err
 | 
						|
			}
 | 
						|
			return nil, errors.New(string(errorText))
 | 
						|
		case 32:
 | 
						|
			continue
 | 
						|
		default:
 | 
						|
			return nil, fmt.Errorf("unexpected filler byte: %d", b)
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// httpStreamResponse allows streaming a response, but still send an error.
 | 
						|
type httpStreamResponse struct {
 | 
						|
	done  chan error
 | 
						|
	block chan []byte
 | 
						|
	err   error
 | 
						|
}
 | 
						|
 | 
						|
// Write part of the streaming response.
 | 
						|
// Note that upstream errors are currently not forwarded, but may be in the future.
 | 
						|
func (h *httpStreamResponse) Write(b []byte) (int, error) {
 | 
						|
	if len(b) == 0 || h.err != nil {
 | 
						|
		// Ignore 0 length blocks
 | 
						|
		return 0, h.err
 | 
						|
	}
 | 
						|
	tmp := make([]byte, len(b))
 | 
						|
	copy(tmp, b)
 | 
						|
	h.block <- tmp
 | 
						|
	return len(b), h.err
 | 
						|
}
 | 
						|
 | 
						|
// CloseWithError will close the stream and return the specified error.
 | 
						|
// This can be done several times, but only the first error will be sent.
 | 
						|
// After calling this the stream should not be written to.
 | 
						|
func (h *httpStreamResponse) CloseWithError(err error) {
 | 
						|
	if h.done == nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	h.done <- err
 | 
						|
	h.err = err
 | 
						|
	// Indicates that the response is done.
 | 
						|
	<-h.done
 | 
						|
	h.done = nil
 | 
						|
}
 | 
						|
 | 
						|
// streamHTTPResponse can be used to avoid timeouts with long storage
 | 
						|
// operations, such as bitrot verification or data usage scanning.
 | 
						|
// Every 10 seconds a space character is sent.
 | 
						|
// The returned function should always be called to release resources.
 | 
						|
// An optional error can be sent which will be picked as text only error,
 | 
						|
// without its original type by the receiver.
 | 
						|
// waitForHTTPStream should be used to the receiving side.
 | 
						|
func streamHTTPResponse(w http.ResponseWriter) *httpStreamResponse {
 | 
						|
	doneCh := make(chan error)
 | 
						|
	blockCh := make(chan []byte)
 | 
						|
	h := httpStreamResponse{done: doneCh, block: blockCh}
 | 
						|
	go func() {
 | 
						|
		canWrite := true
 | 
						|
		write := func(b []byte) {
 | 
						|
			if canWrite {
 | 
						|
				n, err := w.Write(b)
 | 
						|
				if err != nil || n != len(b) {
 | 
						|
					canWrite = false
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		ticker := time.NewTicker(time.Second * 10)
 | 
						|
		defer ticker.Stop()
 | 
						|
		for {
 | 
						|
			select {
 | 
						|
			case <-ticker.C:
 | 
						|
				// Response not ready, write a filler byte.
 | 
						|
				write([]byte{32})
 | 
						|
				if canWrite {
 | 
						|
					w.(http.Flusher).Flush()
 | 
						|
				}
 | 
						|
			case err := <-doneCh:
 | 
						|
				if err != nil {
 | 
						|
					write([]byte{1})
 | 
						|
					write([]byte(err.Error()))
 | 
						|
				} else {
 | 
						|
					write([]byte{0})
 | 
						|
				}
 | 
						|
				xioutil.SafeClose(doneCh)
 | 
						|
				return
 | 
						|
			case block := <-blockCh:
 | 
						|
				var tmp [5]byte
 | 
						|
				tmp[0] = 2
 | 
						|
				binary.LittleEndian.PutUint32(tmp[1:], uint32(len(block)))
 | 
						|
				write(tmp[:])
 | 
						|
				write(block)
 | 
						|
				if canWrite {
 | 
						|
					w.(http.Flusher).Flush()
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	return &h
 | 
						|
}
 | 
						|
 | 
						|
var poolBuf8k = sync.Pool{
 | 
						|
	New: func() interface{} {
 | 
						|
		b := make([]byte, 8192)
 | 
						|
		return &b
 | 
						|
	},
 | 
						|
}
 | 
						|
 | 
						|
var poolBuf128k = sync.Pool{
 | 
						|
	New: func() interface{} {
 | 
						|
		b := make([]byte, 128<<10)
 | 
						|
		return b
 | 
						|
	},
 | 
						|
}
 | 
						|
 | 
						|
// waitForHTTPStream will wait for responses where
 | 
						|
// streamHTTPResponse has been used.
 | 
						|
// The returned reader contains the payload and must be closed if no error is returned.
 | 
						|
func waitForHTTPStream(respBody io.ReadCloser, w io.Writer) error {
 | 
						|
	var tmp [1]byte
 | 
						|
	// 8K copy buffer, reused for less allocs...
 | 
						|
	bufp := poolBuf8k.Get().(*[]byte)
 | 
						|
	buf := *bufp
 | 
						|
	defer poolBuf8k.Put(bufp)
 | 
						|
	for {
 | 
						|
		_, err := io.ReadFull(respBody, tmp[:])
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
		// Check if we have a response ready or a filler byte.
 | 
						|
		switch tmp[0] {
 | 
						|
		case 0:
 | 
						|
			// 0 is unbuffered, copy the rest.
 | 
						|
			_, err := io.CopyBuffer(w, respBody, buf)
 | 
						|
			if err == io.EOF {
 | 
						|
				return nil
 | 
						|
			}
 | 
						|
			return err
 | 
						|
		case 1:
 | 
						|
			errorText, err := io.ReadAll(respBody)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			return errors.New(string(errorText))
 | 
						|
		case 2:
 | 
						|
			// Block of data
 | 
						|
			var tmp [4]byte
 | 
						|
			_, err := io.ReadFull(respBody, tmp[:])
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			length := binary.LittleEndian.Uint32(tmp[:])
 | 
						|
			n, err := io.CopyBuffer(w, io.LimitReader(respBody, int64(length)), buf)
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			if n != int64(length) {
 | 
						|
				return io.ErrUnexpectedEOF
 | 
						|
			}
 | 
						|
			continue
 | 
						|
		case 32:
 | 
						|
			continue
 | 
						|
		default:
 | 
						|
			return fmt.Errorf("unexpected filler byte: %d", tmp[0])
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// VerifyFileResp - VerifyFile()'s response.
 | 
						|
type VerifyFileResp struct {
 | 
						|
	Err error
 | 
						|
}
 | 
						|
 | 
						|
// VerifyFileHandler - Verify all part of file for bitrot errors.
 | 
						|
func (s *storageRESTServer) VerifyFileHandler(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
 | 
						|
	if r.ContentLength < 0 {
 | 
						|
		s.writeErrorResponse(w, errInvalidArgument)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var fi FileInfo
 | 
						|
	if err := msgp.Decode(r.Body, &fi); err != nil {
 | 
						|
		s.writeErrorResponse(w, err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	setEventStreamHeaders(w)
 | 
						|
	encoder := gob.NewEncoder(w)
 | 
						|
	done := keepHTTPResponseAlive(w)
 | 
						|
	err := s.getStorage().VerifyFile(r.Context(), volume, filePath, fi)
 | 
						|
	done(nil)
 | 
						|
	vresp := &VerifyFileResp{}
 | 
						|
	if err != nil {
 | 
						|
		vresp.Err = StorageErr(err.Error())
 | 
						|
	}
 | 
						|
	encoder.Encode(vresp)
 | 
						|
}
 | 
						|
 | 
						|
func checkDiskFatalErrs(errs []error) error {
 | 
						|
	// This returns a common error if all errors are
 | 
						|
	// same errors, then there is no point starting
 | 
						|
	// the server.
 | 
						|
	if countErrs(errs, errUnsupportedDisk) == len(errs) {
 | 
						|
		return errUnsupportedDisk
 | 
						|
	}
 | 
						|
 | 
						|
	if countErrs(errs, errDiskAccessDenied) == len(errs) {
 | 
						|
		return errDiskAccessDenied
 | 
						|
	}
 | 
						|
 | 
						|
	if countErrs(errs, errFileAccessDenied) == len(errs) {
 | 
						|
		return errDiskAccessDenied
 | 
						|
	}
 | 
						|
 | 
						|
	if countErrs(errs, errDiskNotDir) == len(errs) {
 | 
						|
		return errDiskNotDir
 | 
						|
	}
 | 
						|
 | 
						|
	if countErrs(errs, errFaultyDisk) == len(errs) {
 | 
						|
		return errFaultyDisk
 | 
						|
	}
 | 
						|
 | 
						|
	if countErrs(errs, errXLBackend) == len(errs) {
 | 
						|
		return errXLBackend
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// A single function to write certain errors to be fatal
 | 
						|
// or informative based on the `exit` flag, please look
 | 
						|
// at each implementation of error for added hints.
 | 
						|
//
 | 
						|
// FIXME: This is an unusual function but serves its purpose for
 | 
						|
// now, need to revisit the overall erroring structure here.
 | 
						|
// Do not like it :-(
 | 
						|
func logFatalErrs(err error, endpoint Endpoint, exit bool) {
 | 
						|
	switch {
 | 
						|
	case errors.Is(err, errXLBackend):
 | 
						|
		logger.Fatal(config.ErrInvalidXLValue(err), "Unable to initialize backend")
 | 
						|
	case errors.Is(err, errUnsupportedDisk):
 | 
						|
		var hint string
 | 
						|
		if endpoint.URL != nil {
 | 
						|
			hint = fmt.Sprintf("Drive '%s' does not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support", endpoint.Path)
 | 
						|
		} else {
 | 
						|
			hint = "Drives do not support O_DIRECT flags, MinIO erasure coding requires filesystems with O_DIRECT support"
 | 
						|
		}
 | 
						|
		logger.Fatal(config.ErrUnsupportedBackend(err).Hint(hint), "Unable to initialize backend")
 | 
						|
	case errors.Is(err, errDiskNotDir):
 | 
						|
		var hint string
 | 
						|
		if endpoint.URL != nil {
 | 
						|
			hint = fmt.Sprintf("Drive '%s' is not a directory, MinIO erasure coding needs a directory", endpoint.Path)
 | 
						|
		} else {
 | 
						|
			hint = "Drives are not directories, MinIO erasure coding needs directories"
 | 
						|
		}
 | 
						|
		logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
 | 
						|
	case errors.Is(err, errDiskAccessDenied):
 | 
						|
		// Show a descriptive error with a hint about how to fix it.
 | 
						|
		var username string
 | 
						|
		if u, err := user.Current(); err == nil {
 | 
						|
			username = u.Username
 | 
						|
		} else {
 | 
						|
			username = "<your-username>"
 | 
						|
		}
 | 
						|
		var hint string
 | 
						|
		if endpoint.URL != nil {
 | 
						|
			hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s %s && sudo chmod u+rxw %s`",
 | 
						|
				username, endpoint.Path, endpoint.Path)
 | 
						|
		} else {
 | 
						|
			hint = fmt.Sprintf("Run the following command to add write permissions: `sudo chown -R %s. <path> && sudo chmod u+rxw <path>`", username)
 | 
						|
		}
 | 
						|
		if !exit {
 | 
						|
			logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is not writable %s, %s", endpoint, hint), "log-fatal-errs")
 | 
						|
		} else {
 | 
						|
			logger.Fatal(config.ErrUnableToWriteInBackend(err).Hint(hint), "Unable to initialize backend")
 | 
						|
		}
 | 
						|
	case errors.Is(err, errFaultyDisk):
 | 
						|
		if !exit {
 | 
						|
			logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is faulty at %s, please replace the drive - drive will be offline", endpoint), "log-fatal-errs")
 | 
						|
		} else {
 | 
						|
			logger.Fatal(err, "Unable to initialize backend")
 | 
						|
		}
 | 
						|
	case errors.Is(err, errDiskFull):
 | 
						|
		if !exit {
 | 
						|
			logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive is already full at %s, incoming I/O will fail - drive will be offline", endpoint), "log-fatal-errs")
 | 
						|
		} else {
 | 
						|
			logger.Fatal(err, "Unable to initialize backend")
 | 
						|
		}
 | 
						|
	default:
 | 
						|
		if !exit {
 | 
						|
			logger.LogOnceIf(GlobalContext, fmt.Errorf("Drive %s returned an unexpected error: %w, please investigate - drive will be offline", endpoint, err), "log-fatal-errs")
 | 
						|
		} else {
 | 
						|
			logger.Fatal(err, "Unable to initialize backend")
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// StatInfoFile returns file stat info.
 | 
						|
func (s *storageRESTServer) StatInfoFile(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	volume := r.Form.Get(storageRESTVolume)
 | 
						|
	filePath := r.Form.Get(storageRESTFilePath)
 | 
						|
	glob := r.Form.Get(storageRESTGlob)
 | 
						|
	done := keepHTTPResponseAlive(w)
 | 
						|
	stats, err := s.getStorage().StatInfoFile(r.Context(), volume, filePath, glob == "true")
 | 
						|
	done(err)
 | 
						|
	if err != nil {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	for _, si := range stats {
 | 
						|
		msgp.Encode(w, &si)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ReadMultiple returns multiple files
 | 
						|
func (s *storageRESTServer) ReadMultiple(w http.ResponseWriter, r *http.Request) {
 | 
						|
	if !s.IsValid(w, r) {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	rw := streamHTTPResponse(w)
 | 
						|
	defer func() {
 | 
						|
		if r := recover(); r != nil {
 | 
						|
			debug.PrintStack()
 | 
						|
			rw.CloseWithError(fmt.Errorf("panic: %v", r))
 | 
						|
		}
 | 
						|
	}()
 | 
						|
 | 
						|
	var req ReadMultipleReq
 | 
						|
	mr := msgpNewReader(r.Body)
 | 
						|
	defer readMsgpReaderPoolPut(mr)
 | 
						|
	err := req.DecodeMsg(mr)
 | 
						|
	if err != nil {
 | 
						|
		rw.CloseWithError(err)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	mw := msgp.NewWriter(rw)
 | 
						|
	responses := make(chan ReadMultipleResp, len(req.Files))
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	wg.Add(1)
 | 
						|
	go func() {
 | 
						|
		defer wg.Done()
 | 
						|
		for resp := range responses {
 | 
						|
			err := resp.EncodeMsg(mw)
 | 
						|
			if err != nil {
 | 
						|
				rw.CloseWithError(err)
 | 
						|
				return
 | 
						|
			}
 | 
						|
			mw.Flush()
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	err = s.getStorage().ReadMultiple(r.Context(), req, responses)
 | 
						|
	wg.Wait()
 | 
						|
	rw.CloseWithError(err)
 | 
						|
}
 | 
						|
 | 
						|
// globalLocalSetDrives is used for local drive as well as remote REST
 | 
						|
// API caller for other nodes to talk to this node.
 | 
						|
//
 | 
						|
// Any updates to this must be serialized via globalLocalDrivesMu (locker)
 | 
						|
var globalLocalSetDrives [][][]StorageAPI
 | 
						|
 | 
						|
// registerStorageRESTHandlers - register storage rpc router.
 | 
						|
func registerStorageRESTHandlers(router *mux.Router, endpointServerPools EndpointServerPools, gm *grid.Manager) {
 | 
						|
	h := func(f http.HandlerFunc) http.HandlerFunc {
 | 
						|
		return collectInternodeStats(httpTraceHdrs(f))
 | 
						|
	}
 | 
						|
 | 
						|
	globalLocalSetDrives = make([][][]StorageAPI, len(endpointServerPools))
 | 
						|
	for pool := range globalLocalSetDrives {
 | 
						|
		globalLocalSetDrives[pool] = make([][]StorageAPI, endpointServerPools[pool].SetCount)
 | 
						|
		for set := range globalLocalSetDrives[pool] {
 | 
						|
			globalLocalSetDrives[pool][set] = make([]StorageAPI, endpointServerPools[pool].DrivesPerSet)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	for _, serverPool := range endpointServerPools {
 | 
						|
		for _, endpoint := range serverPool.Endpoints {
 | 
						|
			if !endpoint.IsLocal {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			server := &storageRESTServer{
 | 
						|
				endpoint: endpoint,
 | 
						|
			}
 | 
						|
 | 
						|
			subrouter := router.PathPrefix(path.Join(storageRESTPrefix, endpoint.Path)).Subrouter()
 | 
						|
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodHealth).HandlerFunc(h(server.HealthHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodAppendFile).HandlerFunc(h(server.AppendFileHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadVersion).HandlerFunc(h(server.ReadVersionHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadXL).HandlerFunc(h(server.ReadXLHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCreateFile).HandlerFunc(h(server.CreateFileHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFile).HandlerFunc(h(server.ReadFileHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadFileStream).HandlerFunc(h(server.ReadFileStreamHandler))
 | 
						|
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodDeleteVersions).HandlerFunc(h(server.DeleteVersionsHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodVerifyFile).HandlerFunc(h(server.VerifyFileHandler))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodStatInfoFile).HandlerFunc(h(server.StatInfoFile))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodReadMultiple).HandlerFunc(h(server.ReadMultiple))
 | 
						|
			subrouter.Methods(http.MethodPost).Path(storageRESTVersionPrefix + storageRESTMethodCleanAbandoned).HandlerFunc(h(server.CleanAbandonedDataHandler))
 | 
						|
			logger.FatalIf(storageListDirRPC.RegisterNoInput(gm, server.ListDirHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageReadAllRPC.Register(gm, server.ReadAllHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageWriteAllRPC.Register(gm, server.WriteAllHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageRenameFileRPC.Register(gm, server.RenameFileHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageRenameDataRPC.Register(gm, server.RenameDataHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageDeleteFileRPC.Register(gm, server.DeleteFileHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageCheckPartsRPC.Register(gm, server.CheckPartsHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageReadVersionRPC.Register(gm, server.ReadVersionHandlerWS, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageWriteMetadataRPC.Register(gm, server.WriteMetadataHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageUpdateMetadataRPC.Register(gm, server.UpdateMetadataHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageDeleteVersionRPC.Register(gm, server.DeleteVersionHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageReadXLRPC.Register(gm, server.ReadXLHandlerWS, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageNSScannerRPC.RegisterNoInput(gm, server.NSScannerHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageDiskInfoRPC.Register(gm, server.DiskInfoHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(storageStatVolRPC.Register(gm, server.StatVolHandler, endpoint.Path), "unable to register handler")
 | 
						|
			logger.FatalIf(gm.RegisterStreamingHandler(grid.HandlerWalkDir, grid.StreamHandler{
 | 
						|
				Subroute:    endpoint.Path,
 | 
						|
				Handle:      server.WalkDirHandler,
 | 
						|
				OutCapacity: 1,
 | 
						|
			}), "unable to register handler")
 | 
						|
 | 
						|
			createStorage := func(server *storageRESTServer) bool {
 | 
						|
				xl, err := newXLStorage(endpoint, false)
 | 
						|
				if err != nil {
 | 
						|
					// if supported errors don't fail, we proceed to
 | 
						|
					// printing message and moving forward.
 | 
						|
					if errors.Is(err, errDriveIsRoot) {
 | 
						|
						err = fmt.Errorf("major: %v: minor: %v: %w", xl.major, xl.minor, err)
 | 
						|
					}
 | 
						|
					logFatalErrs(err, endpoint, false)
 | 
						|
					return false
 | 
						|
				}
 | 
						|
				storage := newXLStorageDiskIDCheck(xl, true)
 | 
						|
				storage.SetDiskID(xl.diskID)
 | 
						|
				// We do not have to do SetFormatData() since 'xl'
 | 
						|
				// already captures formatData cached.
 | 
						|
 | 
						|
				globalLocalDrivesMu.Lock()
 | 
						|
				defer globalLocalDrivesMu.Unlock()
 | 
						|
 | 
						|
				globalLocalDrives = append(globalLocalDrives, storage)
 | 
						|
				globalLocalSetDrives[endpoint.PoolIdx][endpoint.SetIdx][endpoint.DiskIdx] = storage
 | 
						|
				return true
 | 
						|
			}
 | 
						|
 | 
						|
			if createStorage(server) {
 | 
						|
				continue
 | 
						|
			}
 | 
						|
 | 
						|
			// Start async goroutine to create storage.
 | 
						|
			go func(server *storageRESTServer) {
 | 
						|
				for {
 | 
						|
					time.Sleep(3 * time.Second)
 | 
						|
					if createStorage(server) {
 | 
						|
						return
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}(server)
 | 
						|
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 |