mirror of
				https://github.com/minio/minio.git
				synced 2025-11-04 02:01:05 +01:00 
			
		
		
		
	Add `ConnDialer` to abstract connection creation. - `IncomingConn(ctx context.Context, conn net.Conn)` is provided as an entry point for incoming custom connections. - `ConnectWS` is provided to create web socket connections.
		
			
				
	
	
		
			375 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			375 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright (c) 2015-2023 MinIO, Inc.
 | 
						|
//
 | 
						|
// This file is part of MinIO Object Storage stack
 | 
						|
//
 | 
						|
// This program is free software: you can redistribute it and/or modify
 | 
						|
// it under the terms of the GNU Affero General Public License as published by
 | 
						|
// the Free Software Foundation, either version 3 of the License, or
 | 
						|
// (at your option) any later version.
 | 
						|
//
 | 
						|
// This program is distributed in the hope that it will be useful
 | 
						|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
						|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
						|
// GNU Affero General Public License for more details.
 | 
						|
//
 | 
						|
// You should have received a copy of the GNU Affero General Public License
 | 
						|
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | 
						|
 | 
						|
package grid
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"net"
 | 
						|
	"net/http"
 | 
						|
	"runtime/debug"
 | 
						|
	"strings"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/gobwas/ws"
 | 
						|
	"github.com/gobwas/ws/wsutil"
 | 
						|
	"github.com/google/uuid"
 | 
						|
	"github.com/minio/madmin-go/v3"
 | 
						|
	"github.com/minio/minio/internal/pubsub"
 | 
						|
	"github.com/minio/mux"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	// apiVersion is a major version of the entire api.
 | 
						|
	// Bumping this should only be done when overall,
 | 
						|
	// incompatible changes are made, not when adding a new handler
 | 
						|
	// or changing an existing handler.
 | 
						|
	apiVersion = "v1"
 | 
						|
 | 
						|
	// RoutePath is the remote path to connect to.
 | 
						|
	RoutePath = "/minio/grid/" + apiVersion
 | 
						|
)
 | 
						|
 | 
						|
// Manager will contain all the connections to the grid.
 | 
						|
// It also handles incoming requests and routes them to the appropriate connection.
 | 
						|
type Manager struct {
 | 
						|
	// ID is an instance ID, that will change whenever the server restarts.
 | 
						|
	// This allows remotes to keep track of whether state is preserved.
 | 
						|
	ID uuid.UUID
 | 
						|
 | 
						|
	// Immutable after creation, so no locks.
 | 
						|
	targets map[string]*Connection
 | 
						|
 | 
						|
	// serverside handlers.
 | 
						|
	handlers handlers
 | 
						|
 | 
						|
	// local host name.
 | 
						|
	local string
 | 
						|
 | 
						|
	// authToken is a function that will validate a token.
 | 
						|
	authToken ValidateTokenFn
 | 
						|
}
 | 
						|
 | 
						|
// ManagerOptions are options for creating a new grid manager.
 | 
						|
type ManagerOptions struct {
 | 
						|
	Local        string        // Local host name.
 | 
						|
	Hosts        []string      // All hosts, including local in the grid.
 | 
						|
	Incoming     func(n int64) // Record incoming bytes.
 | 
						|
	Outgoing     func(n int64) // Record outgoing bytes.
 | 
						|
	BlockConnect chan struct{} // If set, incoming and outgoing connections will be blocked until closed.
 | 
						|
	TraceTo      *pubsub.PubSub[madmin.TraceInfo, madmin.TraceType]
 | 
						|
	Dialer       ConnDialer
 | 
						|
	// Sign a token for the given audience.
 | 
						|
	AuthFn AuthFn
 | 
						|
	// Callbacks to validate incoming connections.
 | 
						|
	AuthToken ValidateTokenFn
 | 
						|
}
 | 
						|
 | 
						|
// NewManager creates a new grid manager
 | 
						|
func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error) {
 | 
						|
	found := false
 | 
						|
	if o.AuthToken == nil {
 | 
						|
		return nil, fmt.Errorf("grid: AuthToken not set")
 | 
						|
	}
 | 
						|
	if o.Dialer == nil {
 | 
						|
		return nil, fmt.Errorf("grid: Dialer not set")
 | 
						|
	}
 | 
						|
	if o.AuthFn == nil {
 | 
						|
		return nil, fmt.Errorf("grid: AuthFn not set")
 | 
						|
	}
 | 
						|
	m := &Manager{
 | 
						|
		ID:        uuid.New(),
 | 
						|
		targets:   make(map[string]*Connection, len(o.Hosts)),
 | 
						|
		local:     o.Local,
 | 
						|
		authToken: o.AuthToken,
 | 
						|
	}
 | 
						|
	m.handlers.init()
 | 
						|
	if ctx == nil {
 | 
						|
		ctx = context.Background()
 | 
						|
	}
 | 
						|
 | 
						|
	for _, host := range o.Hosts {
 | 
						|
		if host == o.Local {
 | 
						|
			if found {
 | 
						|
				return nil, fmt.Errorf("grid: local host found multiple times")
 | 
						|
			}
 | 
						|
			found = true
 | 
						|
			// No connection to local.
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		m.targets[host] = newConnection(connectionParams{
 | 
						|
			ctx:           ctx,
 | 
						|
			id:            m.ID,
 | 
						|
			local:         o.Local,
 | 
						|
			remote:        host,
 | 
						|
			handlers:      &m.handlers,
 | 
						|
			blockConnect:  o.BlockConnect,
 | 
						|
			publisher:     o.TraceTo,
 | 
						|
			incomingBytes: o.Incoming,
 | 
						|
			outgoingBytes: o.Outgoing,
 | 
						|
			dialer:        o.Dialer,
 | 
						|
			authFn:        o.AuthFn,
 | 
						|
		})
 | 
						|
	}
 | 
						|
	if !found {
 | 
						|
		return nil, fmt.Errorf("grid: local host (%s) not found in cluster setup", o.Local)
 | 
						|
	}
 | 
						|
 | 
						|
	return m, nil
 | 
						|
}
 | 
						|
 | 
						|
// AddToMux will add the grid manager to the given mux.
 | 
						|
func (m *Manager) AddToMux(router *mux.Router, authReq func(r *http.Request) error) {
 | 
						|
	router.Handle(RoutePath, m.Handler(authReq))
 | 
						|
}
 | 
						|
 | 
						|
// Handler returns a handler that can be used to serve grid requests.
 | 
						|
// This should be connected on RoutePath to the main server.
 | 
						|
func (m *Manager) Handler(authReq func(r *http.Request) error) http.HandlerFunc {
 | 
						|
	return func(w http.ResponseWriter, req *http.Request) {
 | 
						|
		defer func() {
 | 
						|
			if debugPrint {
 | 
						|
				fmt.Printf("grid: Handler returning from: %v %v\n", req.Method, req.URL)
 | 
						|
			}
 | 
						|
			if r := recover(); r != nil {
 | 
						|
				debug.PrintStack()
 | 
						|
				err := fmt.Errorf("grid: panic: %v\n", r)
 | 
						|
				gridLogIf(context.Background(), err, err.Error())
 | 
						|
				w.WriteHeader(http.StatusInternalServerError)
 | 
						|
			}
 | 
						|
		}()
 | 
						|
		if debugPrint {
 | 
						|
			fmt.Printf("grid: Got a %s request for: %v\n", req.Method, req.URL)
 | 
						|
		}
 | 
						|
		ctx := req.Context()
 | 
						|
		if err := authReq(req); err != nil {
 | 
						|
			gridLogOnceIf(ctx, fmt.Errorf("auth %s: %w", req.RemoteAddr, err), req.RemoteAddr)
 | 
						|
			w.WriteHeader(http.StatusForbidden)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		conn, _, _, err := ws.UpgradeHTTP(req, w)
 | 
						|
		if err != nil {
 | 
						|
			if debugPrint {
 | 
						|
				fmt.Printf("grid: Unable to upgrade: %v. http.ResponseWriter is type %T\n", err, w)
 | 
						|
			}
 | 
						|
			w.WriteHeader(http.StatusUpgradeRequired)
 | 
						|
			return
 | 
						|
		}
 | 
						|
		m.IncomingConn(ctx, conn)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// IncomingConn will handle an incoming connection.
 | 
						|
// This should be called with the incoming connection after accept.
 | 
						|
// Auth is handled internally, as well as disconnecting any connections from the same host.
 | 
						|
func (m *Manager) IncomingConn(ctx context.Context, conn net.Conn) {
 | 
						|
	remoteAddr := conn.RemoteAddr().String()
 | 
						|
	// will write an OpConnectResponse message to the remote and log it once locally.
 | 
						|
	defer conn.Close()
 | 
						|
	writeErr := func(err error) {
 | 
						|
		if err == nil {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		if errors.Is(err, io.EOF) {
 | 
						|
			return
 | 
						|
		}
 | 
						|
		gridLogOnceIf(ctx, err, remoteAddr)
 | 
						|
		resp := connectResp{
 | 
						|
			ID:             m.ID,
 | 
						|
			Accepted:       false,
 | 
						|
			RejectedReason: err.Error(),
 | 
						|
		}
 | 
						|
		if b, err := resp.MarshalMsg(nil); err == nil {
 | 
						|
			msg := message{
 | 
						|
				Op:      OpConnectResponse,
 | 
						|
				Payload: b,
 | 
						|
			}
 | 
						|
			if b, err := msg.MarshalMsg(nil); err == nil {
 | 
						|
				wsutil.WriteMessage(conn, ws.StateServerSide, ws.OpBinary, b)
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	defer conn.Close()
 | 
						|
	if debugPrint {
 | 
						|
		fmt.Printf("grid: Upgraded request: %v\n", remoteAddr)
 | 
						|
	}
 | 
						|
 | 
						|
	msg, _, err := wsutil.ReadClientData(conn)
 | 
						|
	if err != nil {
 | 
						|
		writeErr(fmt.Errorf("reading connect: %w", err))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if debugPrint {
 | 
						|
		fmt.Printf("%s handler: Got message, length %v\n", m.local, len(msg))
 | 
						|
	}
 | 
						|
 | 
						|
	var message message
 | 
						|
	_, _, err = message.parse(msg)
 | 
						|
	if err != nil {
 | 
						|
		writeErr(fmt.Errorf("error parsing grid connect: %w", err))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if message.Op != OpConnect {
 | 
						|
		writeErr(fmt.Errorf("unexpected connect op: %v", message.Op))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	var cReq connectReq
 | 
						|
	_, err = cReq.UnmarshalMsg(message.Payload)
 | 
						|
	if err != nil {
 | 
						|
		writeErr(fmt.Errorf("error parsing connectReq: %w", err))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	remote := m.targets[cReq.Host]
 | 
						|
	if remote == nil {
 | 
						|
		writeErr(fmt.Errorf("unknown incoming host: %v", cReq.Host))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if time.Since(cReq.Time).Abs() > 5*time.Minute {
 | 
						|
		writeErr(fmt.Errorf("time difference too large between servers: %v", time.Since(cReq.Time).Abs()))
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if err := m.authToken(cReq.Token, cReq.audience()); err != nil {
 | 
						|
		writeErr(fmt.Errorf("auth token: %w", err))
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	if debugPrint {
 | 
						|
		fmt.Printf("handler: Got Connect Req %+v\n", cReq)
 | 
						|
	}
 | 
						|
	writeErr(remote.handleIncoming(ctx, conn, cReq))
 | 
						|
}
 | 
						|
 | 
						|
// AuthFn should provide an authentication string for the given aud.
 | 
						|
type AuthFn func(aud string) string
 | 
						|
 | 
						|
// ValidateAuthFn should check authentication for the given aud.
 | 
						|
type ValidateAuthFn func(auth, aud string) string
 | 
						|
 | 
						|
// Connection will return the connection for the specified host.
 | 
						|
// If the host does not exist nil will be returned.
 | 
						|
func (m *Manager) Connection(host string) *Connection {
 | 
						|
	return m.targets[host]
 | 
						|
}
 | 
						|
 | 
						|
// RegisterSingleHandler will register a stateless handler that serves
 | 
						|
// []byte -> ([]byte, error) requests.
 | 
						|
// subroutes are joined with "/" to a single subroute.
 | 
						|
func (m *Manager) RegisterSingleHandler(id HandlerID, h SingleHandlerFn, subroute ...string) error {
 | 
						|
	if !id.valid() {
 | 
						|
		return ErrUnknownHandler
 | 
						|
	}
 | 
						|
	s := strings.Join(subroute, "/")
 | 
						|
	if debugPrint {
 | 
						|
		fmt.Println("RegisterSingleHandler: ", id.String(), "subroute:", s)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(subroute) == 0 {
 | 
						|
		if m.handlers.hasAny(id) && !id.isTestHandler() {
 | 
						|
			return fmt.Errorf("handler %v: %w", id.String(), ErrHandlerAlreadyExists)
 | 
						|
		}
 | 
						|
 | 
						|
		m.handlers.single[id] = h
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	subID := makeSubHandlerID(id, s)
 | 
						|
	if m.handlers.hasSubhandler(subID) && !id.isTestHandler() {
 | 
						|
		return fmt.Errorf("handler %v, subroute:%v: %w", id.String(), s, ErrHandlerAlreadyExists)
 | 
						|
	}
 | 
						|
	m.handlers.subSingle[subID] = h
 | 
						|
	// Copy so clients can also pick it up for other subpaths.
 | 
						|
	m.handlers.subSingle[makeZeroSubHandlerID(id)] = h
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
// RegisterStateless will register a stateless handler that serves
 | 
						|
// []byte -> stream of ([]byte, error) requests.
 | 
						|
func (m *Manager) RegisterStateless(id HandlerID, h StatelessHandler) error {
 | 
						|
	if !id.valid() {
 | 
						|
		return ErrUnknownHandler
 | 
						|
	}
 | 
						|
	if m.handlers.hasAny(id) && !id.isTestHandler() {
 | 
						|
		return ErrHandlerAlreadyExists
 | 
						|
	}
 | 
						|
 | 
						|
	m.handlers.stateless[id] = &h
 | 
						|
	return nil
 | 
						|
}
 | 
						|
*/
 | 
						|
 | 
						|
// RegisterStreamingHandler will register a stateless handler that serves
 | 
						|
// two-way streaming requests.
 | 
						|
func (m *Manager) RegisterStreamingHandler(id HandlerID, h StreamHandler) error {
 | 
						|
	if !id.valid() {
 | 
						|
		return ErrUnknownHandler
 | 
						|
	}
 | 
						|
	if debugPrint {
 | 
						|
		fmt.Println("RegisterStreamingHandler: subroute:", h.Subroute)
 | 
						|
	}
 | 
						|
	if h.Subroute == "" {
 | 
						|
		if m.handlers.hasAny(id) && !id.isTestHandler() {
 | 
						|
			return ErrHandlerAlreadyExists
 | 
						|
		}
 | 
						|
		m.handlers.streams[id] = &h
 | 
						|
		return nil
 | 
						|
	}
 | 
						|
	subID := makeSubHandlerID(id, h.Subroute)
 | 
						|
	if m.handlers.hasSubhandler(subID) && !id.isTestHandler() {
 | 
						|
		return ErrHandlerAlreadyExists
 | 
						|
	}
 | 
						|
	m.handlers.subStreams[subID] = &h
 | 
						|
	// Copy so clients can also pick it up for other subpaths.
 | 
						|
	m.handlers.subStreams[makeZeroSubHandlerID(id)] = &h
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// HostName returns the name of the local host.
 | 
						|
func (m *Manager) HostName() string {
 | 
						|
	return m.local
 | 
						|
}
 | 
						|
 | 
						|
// Targets returns the names of all remote targets.
 | 
						|
func (m *Manager) Targets() []string {
 | 
						|
	var res []string
 | 
						|
	for k := range m.targets {
 | 
						|
		res = append(res, k)
 | 
						|
	}
 | 
						|
	return res
 | 
						|
}
 | 
						|
 | 
						|
// debugMsg should *only* be used by tests.
 | 
						|
//
 | 
						|
//lint:ignore U1000 This is used by tests.
 | 
						|
func (m *Manager) debugMsg(d debugMsg, args ...any) {
 | 
						|
	for _, c := range m.targets {
 | 
						|
		c.debugMsg(d, args...)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// ConnStats returns the connection statistics for all connections.
 | 
						|
func (m *Manager) ConnStats() madmin.RPCMetrics {
 | 
						|
	var res madmin.RPCMetrics
 | 
						|
	for _, c := range m.targets {
 | 
						|
		t := c.Stats()
 | 
						|
		res.Merge(&t)
 | 
						|
	}
 | 
						|
	return res
 | 
						|
}
 |