mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 02:01:05 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			296 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			296 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/*
 | 
						|
 * Minio Cloud Storage, (C) 2014, 2015, 2016, 2017, 2018 Minio, Inc.
 | 
						|
 *
 | 
						|
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
 * you may not use this file except in compliance with the License.
 | 
						|
 * You may obtain a copy of the License at
 | 
						|
 *
 | 
						|
 *     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
 *
 | 
						|
 * Unless required by applicable law or agreed to in writing, software
 | 
						|
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
 * See the License for the specific language governing permissions and
 | 
						|
 * limitations under the License.
 | 
						|
 */
 | 
						|
 | 
						|
package cmd
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"crypto/tls"
 | 
						|
	"fmt"
 | 
						|
	"net"
 | 
						|
	"sort"
 | 
						|
	"strings"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/minio/minio/cmd/logger"
 | 
						|
	xnet "github.com/minio/minio/pkg/net"
 | 
						|
)
 | 
						|
 | 
						|
var errUnsupportedSignal = fmt.Errorf("unsupported signal: only restart and stop signals are supported")
 | 
						|
 | 
						|
// AdminRPCClient - admin RPC client talks to admin RPC server.
 | 
						|
type AdminRPCClient struct {
 | 
						|
	*RPCClient
 | 
						|
}
 | 
						|
 | 
						|
// SignalService - calls SignalService RPC.
 | 
						|
func (rpcClient *AdminRPCClient) SignalService(signal serviceSignal) (err error) {
 | 
						|
	args := SignalServiceArgs{Sig: signal}
 | 
						|
	reply := VoidReply{}
 | 
						|
 | 
						|
	return rpcClient.Call(adminServiceName+".SignalService", &args, &reply)
 | 
						|
}
 | 
						|
 | 
						|
// ReInitFormat - re-initialize disk format, remotely.
 | 
						|
func (rpcClient *AdminRPCClient) ReInitFormat(dryRun bool) error {
 | 
						|
	args := ReInitFormatArgs{DryRun: dryRun}
 | 
						|
	reply := VoidReply{}
 | 
						|
 | 
						|
	return rpcClient.Call(adminServiceName+".ReInitFormat", &args, &reply)
 | 
						|
}
 | 
						|
 | 
						|
// ServerInfo - returns the server info of the server to which the RPC call is made.
 | 
						|
func (rpcClient *AdminRPCClient) ServerInfo() (sid ServerInfoData, err error) {
 | 
						|
	err = rpcClient.Call(adminServiceName+".ServerInfo", &AuthArgs{}, &sid)
 | 
						|
	return sid, err
 | 
						|
}
 | 
						|
 | 
						|
// GetConfig - returns config.json of the remote server.
 | 
						|
func (rpcClient *AdminRPCClient) GetConfig() ([]byte, error) {
 | 
						|
	args := AuthArgs{}
 | 
						|
	var reply []byte
 | 
						|
 | 
						|
	err := rpcClient.Call(adminServiceName+".GetConfig", &args, &reply)
 | 
						|
	return reply, err
 | 
						|
}
 | 
						|
 | 
						|
// StartProfiling - starts profiling in the remote server.
 | 
						|
func (rpcClient *AdminRPCClient) StartProfiling(profiler string) error {
 | 
						|
	args := StartProfilingArgs{Profiler: profiler}
 | 
						|
	reply := VoidReply{}
 | 
						|
	return rpcClient.Call(adminServiceName+".StartProfiling", &args, &reply)
 | 
						|
}
 | 
						|
 | 
						|
// DownloadProfilingData - returns profiling data of the remote server.
 | 
						|
func (rpcClient *AdminRPCClient) DownloadProfilingData() ([]byte, error) {
 | 
						|
	args := AuthArgs{}
 | 
						|
	var reply []byte
 | 
						|
 | 
						|
	err := rpcClient.Call(adminServiceName+".DownloadProfilingData", &args, &reply)
 | 
						|
	return reply, err
 | 
						|
}
 | 
						|
 | 
						|
// NewAdminRPCClient - returns new admin RPC client.
 | 
						|
func NewAdminRPCClient(host *xnet.Host) (*AdminRPCClient, error) {
 | 
						|
	scheme := "http"
 | 
						|
	if globalIsSSL {
 | 
						|
		scheme = "https"
 | 
						|
	}
 | 
						|
 | 
						|
	serviceURL := &xnet.URL{
 | 
						|
		Scheme: scheme,
 | 
						|
		Host:   host.String(),
 | 
						|
		Path:   adminServicePath,
 | 
						|
	}
 | 
						|
 | 
						|
	var tlsConfig *tls.Config
 | 
						|
	if globalIsSSL {
 | 
						|
		tlsConfig = &tls.Config{
 | 
						|
			ServerName: host.Name,
 | 
						|
			RootCAs:    globalRootCAs,
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	rpcClient, err := NewRPCClient(
 | 
						|
		RPCClientArgs{
 | 
						|
			NewAuthTokenFunc: newAuthToken,
 | 
						|
			RPCVersion:       globalRPCAPIVersion,
 | 
						|
			ServiceName:      adminServiceName,
 | 
						|
			ServiceURL:       serviceURL,
 | 
						|
			TLSConfig:        tlsConfig,
 | 
						|
		},
 | 
						|
	)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	return &AdminRPCClient{rpcClient}, nil
 | 
						|
}
 | 
						|
 | 
						|
// adminCmdRunner - abstracts local and remote execution of admin
 | 
						|
// commands like service stop and service restart.
 | 
						|
type adminCmdRunner interface {
 | 
						|
	SignalService(s serviceSignal) error
 | 
						|
	ReInitFormat(dryRun bool) error
 | 
						|
	ServerInfo() (ServerInfoData, error)
 | 
						|
	GetConfig() ([]byte, error)
 | 
						|
	StartProfiling(string) error
 | 
						|
	DownloadProfilingData() ([]byte, error)
 | 
						|
}
 | 
						|
 | 
						|
// adminPeer - represents an entity that implements admin API RPCs.
 | 
						|
type adminPeer struct {
 | 
						|
	addr      string
 | 
						|
	cmdRunner adminCmdRunner
 | 
						|
	isLocal   bool
 | 
						|
}
 | 
						|
 | 
						|
// type alias for a collection of adminPeer.
 | 
						|
type adminPeers []adminPeer
 | 
						|
 | 
						|
// makeAdminPeers - helper function to construct a collection of adminPeer.
 | 
						|
func makeAdminPeers(endpoints EndpointList) (adminPeerList adminPeers) {
 | 
						|
	localAddr := GetLocalPeer(endpoints)
 | 
						|
	if strings.HasPrefix(localAddr, "127.0.0.1:") {
 | 
						|
		// Use first IPv4 instead of loopback address.
 | 
						|
		localAddr = net.JoinHostPort(sortIPs(localIP4.ToSlice())[0], globalMinioPort)
 | 
						|
	}
 | 
						|
	adminPeerList = append(adminPeerList, adminPeer{
 | 
						|
		addr:      localAddr,
 | 
						|
		cmdRunner: localAdminClient{},
 | 
						|
		isLocal:   true,
 | 
						|
	})
 | 
						|
 | 
						|
	for _, hostStr := range GetRemotePeers(endpoints) {
 | 
						|
		host, err := xnet.ParseHost(hostStr)
 | 
						|
		logger.FatalIf(err, "Unable to parse Admin RPC Host")
 | 
						|
		rpcClient, err := NewAdminRPCClient(host)
 | 
						|
		logger.FatalIf(err, "Unable to initialize Admin RPC Client")
 | 
						|
		adminPeerList = append(adminPeerList, adminPeer{
 | 
						|
			addr:      hostStr,
 | 
						|
			cmdRunner: rpcClient,
 | 
						|
		})
 | 
						|
	}
 | 
						|
 | 
						|
	return adminPeerList
 | 
						|
}
 | 
						|
 | 
						|
// peersReInitFormat - reinitialize remote object layers to new format.
 | 
						|
func peersReInitFormat(peers adminPeers, dryRun bool) error {
 | 
						|
	errs := make([]error, len(peers))
 | 
						|
 | 
						|
	// Send ReInitFormat RPC call to all nodes.
 | 
						|
	// for local adminPeer this is a no-op.
 | 
						|
	wg := sync.WaitGroup{}
 | 
						|
	for i, peer := range peers {
 | 
						|
		wg.Add(1)
 | 
						|
		go func(idx int, peer adminPeer) {
 | 
						|
			defer wg.Done()
 | 
						|
			if !peer.isLocal {
 | 
						|
				errs[idx] = peer.cmdRunner.ReInitFormat(dryRun)
 | 
						|
			}
 | 
						|
		}(i, peer)
 | 
						|
	}
 | 
						|
	wg.Wait()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Initialize global adminPeer collection.
 | 
						|
func initGlobalAdminPeers(endpoints EndpointList) {
 | 
						|
	globalAdminPeers = makeAdminPeers(endpoints)
 | 
						|
}
 | 
						|
 | 
						|
// invokeServiceCmd - Invoke Restart/Stop command.
 | 
						|
func invokeServiceCmd(cp adminPeer, cmd serviceSignal) (err error) {
 | 
						|
	switch cmd {
 | 
						|
	case serviceRestart, serviceStop:
 | 
						|
		err = cp.cmdRunner.SignalService(cmd)
 | 
						|
	}
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// sendServiceCmd - Invoke Restart command on remote peers
 | 
						|
// adminPeer followed by on the local peer.
 | 
						|
func sendServiceCmd(cps adminPeers, cmd serviceSignal) {
 | 
						|
	// Send service command like stop or restart to all remote nodes and finally run on local node.
 | 
						|
	errs := make([]error, len(cps))
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	remotePeers := cps[1:]
 | 
						|
	for i := range remotePeers {
 | 
						|
		wg.Add(1)
 | 
						|
		go func(idx int) {
 | 
						|
			defer wg.Done()
 | 
						|
			// we use idx+1 because remotePeers slice is 1 position shifted w.r.t cps
 | 
						|
			errs[idx+1] = invokeServiceCmd(remotePeers[idx], cmd)
 | 
						|
		}(i)
 | 
						|
	}
 | 
						|
	wg.Wait()
 | 
						|
	errs[0] = invokeServiceCmd(cps[0], cmd)
 | 
						|
}
 | 
						|
 | 
						|
// uptimeSlice - used to sort uptimes in chronological order.
 | 
						|
type uptimeSlice []struct {
 | 
						|
	err    error
 | 
						|
	uptime time.Duration
 | 
						|
}
 | 
						|
 | 
						|
func (ts uptimeSlice) Len() int {
 | 
						|
	return len(ts)
 | 
						|
}
 | 
						|
 | 
						|
func (ts uptimeSlice) Less(i, j int) bool {
 | 
						|
	return ts[i].uptime < ts[j].uptime
 | 
						|
}
 | 
						|
 | 
						|
func (ts uptimeSlice) Swap(i, j int) {
 | 
						|
	ts[i], ts[j] = ts[j], ts[i]
 | 
						|
}
 | 
						|
 | 
						|
// getPeerUptimes - returns the uptime since the last time read quorum
 | 
						|
// was established on success. Otherwise returns errXLReadQuorum.
 | 
						|
func getPeerUptimes(peers adminPeers) (time.Duration, error) {
 | 
						|
	// In a single node Erasure or FS backend setup the uptime of
 | 
						|
	// the setup is the uptime of the single minio server
 | 
						|
	// instance.
 | 
						|
	if !globalIsDistXL {
 | 
						|
		return UTCNow().Sub(globalBootTime), nil
 | 
						|
	}
 | 
						|
 | 
						|
	uptimes := make(uptimeSlice, len(peers))
 | 
						|
 | 
						|
	// Get up time of all servers.
 | 
						|
	wg := sync.WaitGroup{}
 | 
						|
	for i, peer := range peers {
 | 
						|
		wg.Add(1)
 | 
						|
		go func(idx int, peer adminPeer) {
 | 
						|
			defer wg.Done()
 | 
						|
			serverInfoData, rpcErr := peer.cmdRunner.ServerInfo()
 | 
						|
			uptimes[idx].uptime, uptimes[idx].err = serverInfoData.Properties.Uptime, rpcErr
 | 
						|
		}(i, peer)
 | 
						|
	}
 | 
						|
	wg.Wait()
 | 
						|
 | 
						|
	// Sort uptimes in chronological order.
 | 
						|
	sort.Sort(uptimes)
 | 
						|
 | 
						|
	// Pick the readQuorum'th uptime in chronological order. i.e,
 | 
						|
	// the time at which read quorum was (re-)established.
 | 
						|
	readQuorum := len(uptimes) / 2
 | 
						|
	validCount := 0
 | 
						|
	latestUptime := time.Duration(0)
 | 
						|
	for _, uptime := range uptimes {
 | 
						|
		if uptime.err != nil {
 | 
						|
			logger.LogIf(context.Background(), uptime.err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		validCount++
 | 
						|
		if validCount >= readQuorum {
 | 
						|
			latestUptime = uptime.uptime
 | 
						|
			break
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Less than readQuorum "Admin.Uptime" RPC call returned
 | 
						|
	// successfully, so read-quorum unavailable.
 | 
						|
	if validCount < readQuorum {
 | 
						|
		return time.Duration(0), InsufficientReadQuorum{}
 | 
						|
	}
 | 
						|
 | 
						|
	return latestUptime, nil
 | 
						|
}
 |