diff --git a/cmd/generic-handlers.go b/cmd/generic-handlers.go index defd66e6c..9298ee405 100644 --- a/cmd/generic-handlers.go +++ b/cmd/generic-handlers.go @@ -247,8 +247,11 @@ func guessIsRPCReq(req *http.Request) bool { if req == nil { return false } - if req.Method == http.MethodGet && req.URL != nil && req.URL.Path == grid.RoutePath { - return true + if req.Method == http.MethodGet && req.URL != nil { + switch req.URL.Path { + case grid.RoutePath, grid.RouteLockPath: + return true + } } return (req.Method == http.MethodPost || req.Method == http.MethodGet) && diff --git a/cmd/generic-handlers_test.go b/cmd/generic-handlers_test.go index 12ba5631e..f6cc6e07e 100644 --- a/cmd/generic-handlers_test.go +++ b/cmd/generic-handlers_test.go @@ -64,6 +64,14 @@ func TestGuessIsRPC(t *testing.T) { if !guessIsRPCReq(r) { t.Fatal("Grid RPC path not detected") } + r = &http.Request{ + Proto: "HTTP/1.1", + Method: http.MethodGet, + URL: &url.URL{Path: grid.RouteLockPath}, + } + if !guessIsRPCReq(r) { + t.Fatal("Grid RPC path not detected") + } } var isHTTPHeaderSizeTooLargeTests = []struct { diff --git a/cmd/grid.go b/cmd/grid.go index 125dda952..a2c8f3973 100644 --- a/cmd/grid.go +++ b/cmd/grid.go @@ -31,9 +31,15 @@ import ( // globalGrid is the global grid manager. var globalGrid atomic.Pointer[grid.Manager] +// globalLockGrid is the global lock grid manager. +var globalLockGrid atomic.Pointer[grid.Manager] + // globalGridStart is a channel that will block startup of grid connections until closed. var globalGridStart = make(chan struct{}) +// globalLockGridStart is a channel that will block startup of lock grid connections until closed. +var globalLockGridStart = make(chan struct{}) + func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error { hosts, local := eps.GridHosts() lookupHost := globalDNSCache.LookupHost @@ -55,9 +61,10 @@ func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error { AuthFn: newCachedAuthToken(), BlockConnect: globalGridStart, // Record incoming and outgoing bytes. - Incoming: globalConnStats.incInternodeInputBytes, - Outgoing: globalConnStats.incInternodeOutputBytes, - TraceTo: globalTrace, + Incoming: globalConnStats.incInternodeInputBytes, + Outgoing: globalConnStats.incInternodeOutputBytes, + TraceTo: globalTrace, + RoutePath: grid.RoutePath, }) if err != nil { return err @@ -65,3 +72,36 @@ func initGlobalGrid(ctx context.Context, eps EndpointServerPools) error { globalGrid.Store(g) return nil } + +func initGlobalLockGrid(ctx context.Context, eps EndpointServerPools) error { + hosts, local := eps.GridHosts() + lookupHost := globalDNSCache.LookupHost + g, err := grid.NewManager(ctx, grid.ManagerOptions{ + // Pass Dialer for websocket grid, make sure we do not + // provide any DriveOPTimeout() function, as that is not + // useful over persistent connections. + Dialer: grid.ConnectWSWithRoutePath( + grid.ContextDialer(xhttp.DialContextWithLookupHost(lookupHost, xhttp.NewInternodeDialContext(rest.DefaultTimeout, globalTCPOptions.ForWebsocket()))), + newCachedAuthToken(), + &tls.Config{ + RootCAs: globalRootCAs, + CipherSuites: fips.TLSCiphers(), + CurvePreferences: fips.TLSCurveIDs(), + }, grid.RouteLockPath), + Local: local, + Hosts: hosts, + AuthToken: validateStorageRequestToken, + AuthFn: newCachedAuthToken(), + BlockConnect: globalGridStart, + // Record incoming and outgoing bytes. + Incoming: globalConnStats.incInternodeInputBytes, + Outgoing: globalConnStats.incInternodeOutputBytes, + TraceTo: globalTrace, + RoutePath: grid.RouteLockPath, + }) + if err != nil { + return err + } + globalLockGrid.Store(g) + return nil +} diff --git a/cmd/lock-rest-client.go b/cmd/lock-rest-client.go index bf162d124..ff8e2c35a 100644 --- a/cmd/lock-rest-client.go +++ b/cmd/lock-rest-client.go @@ -107,5 +107,5 @@ func newLockAPI(endpoint Endpoint) dsync.NetLocker { // Returns a lock rest client. func newlockRESTClient(ep Endpoint) *lockRESTClient { - return &lockRESTClient{globalGrid.Load().Connection(ep.GridHost())} + return &lockRESTClient{globalLockGrid.Load().Connection(ep.GridHost())} } diff --git a/cmd/lock-rest-client_test.go b/cmd/lock-rest-client_test.go index 10beb12a0..15965235a 100644 --- a/cmd/lock-rest-client_test.go +++ b/cmd/lock-rest-client_test.go @@ -39,7 +39,7 @@ func TestLockRESTlient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err = initGlobalGrid(ctx, []PoolEndpoints{{Endpoints: Endpoints{endpoint, endpointLocal}}}) + err = initGlobalLockGrid(ctx, []PoolEndpoints{{Endpoints: Endpoints{endpoint, endpointLocal}}}) if err != nil { t.Fatal(err) } diff --git a/cmd/lock-rest-server.go b/cmd/lock-rest-server.go index a0adea595..efabef342 100644 --- a/cmd/lock-rest-server.go +++ b/cmd/lock-rest-server.go @@ -111,17 +111,17 @@ func newLockHandler(h grid.HandlerID) *grid.SingleHandler[*dsync.LockArgs, *dsyn } // registerLockRESTHandlers - register lock rest router. -func registerLockRESTHandlers() { +func registerLockRESTHandlers(gm *grid.Manager) { lockServer := &lockRESTServer{ ll: newLocker(), } - logger.FatalIf(lockRPCForceUnlock.Register(globalGrid.Load(), lockServer.ForceUnlockHandler), "unable to register handler") - logger.FatalIf(lockRPCRefresh.Register(globalGrid.Load(), lockServer.RefreshHandler), "unable to register handler") - logger.FatalIf(lockRPCLock.Register(globalGrid.Load(), lockServer.LockHandler), "unable to register handler") - logger.FatalIf(lockRPCUnlock.Register(globalGrid.Load(), lockServer.UnlockHandler), "unable to register handler") - logger.FatalIf(lockRPCRLock.Register(globalGrid.Load(), lockServer.RLockHandler), "unable to register handler") - logger.FatalIf(lockRPCRUnlock.Register(globalGrid.Load(), lockServer.RUnlockHandler), "unable to register handler") + logger.FatalIf(lockRPCForceUnlock.Register(gm, lockServer.ForceUnlockHandler), "unable to register handler") + logger.FatalIf(lockRPCRefresh.Register(gm, lockServer.RefreshHandler), "unable to register handler") + logger.FatalIf(lockRPCLock.Register(gm, lockServer.LockHandler), "unable to register handler") + logger.FatalIf(lockRPCUnlock.Register(gm, lockServer.UnlockHandler), "unable to register handler") + logger.FatalIf(lockRPCRLock.Register(gm, lockServer.RLockHandler), "unable to register handler") + logger.FatalIf(lockRPCRUnlock.Register(gm, lockServer.RUnlockHandler), "unable to register handler") globalLockServer = lockServer.ll diff --git a/cmd/routers.go b/cmd/routers.go index e0cafdbea..c0bfc91df 100644 --- a/cmd/routers.go +++ b/cmd/routers.go @@ -26,20 +26,28 @@ import ( // Composed function registering routers for only distributed Erasure setup. func registerDistErasureRouters(router *mux.Router, endpointServerPools EndpointServerPools) { + var ( + lockGrid = globalLockGrid.Load() + commonGrid = globalGrid.Load() + ) + // Register storage REST router only if its a distributed setup. - registerStorageRESTHandlers(router, endpointServerPools, globalGrid.Load()) + registerStorageRESTHandlers(router, endpointServerPools, commonGrid) // Register peer REST router only if its a distributed setup. - registerPeerRESTHandlers(router, globalGrid.Load()) + registerPeerRESTHandlers(router, commonGrid) // Register bootstrap REST router for distributed setups. - registerBootstrapRESTHandlers(globalGrid.Load()) + registerBootstrapRESTHandlers(commonGrid) // Register distributed namespace lock routers. - registerLockRESTHandlers() + registerLockRESTHandlers(lockGrid) + + // Add lock grid to router + router.Handle(grid.RouteLockPath, adminMiddleware(lockGrid.Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag)) // Add grid to router - router.Handle(grid.RoutePath, adminMiddleware(globalGrid.Load().Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag)) + router.Handle(grid.RoutePath, adminMiddleware(commonGrid.Handler(storageServerRequestValidate), noGZFlag, noObjLayerFlag)) } // List of some generic middlewares which are applied for all incoming requests. diff --git a/cmd/server-main.go b/cmd/server-main.go index 4a168bd74..92131d6b3 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -856,6 +856,11 @@ func serverMain(ctx *cli.Context) { logger.FatalIf(initGlobalGrid(GlobalContext, globalEndpoints), "Unable to configure server grid RPC services") }) + // Initialize lock grid + bootstrapTrace("initLockGrid", func() { + logger.FatalIf(initGlobalLockGrid(GlobalContext, globalEndpoints), "Unable to configure server lock grid RPC services") + }) + // Configure server. bootstrapTrace("configureServer", func() { handler, err := configureServerHandler(globalEndpoints) @@ -863,7 +868,8 @@ func serverMain(ctx *cli.Context) { logger.Fatal(config.ErrUnexpectedError(err), "Unable to configure one of server's RPC services") } // Allow grid to start after registering all services. - xioutil.SafeClose(globalGridStart) + close(globalGridStart) + close(globalLockGridStart) httpServer := xhttp.NewServer(getServerListenAddrs()). UseHandler(setCriticalErrorHandler(corsHandler(handler))). diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 8bfe682f6..40f84ba46 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -1319,7 +1319,11 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub handler = c.handlers.subStateless[*subID] } if handler == nil { - gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"})) + msg := fmt.Sprintf("Invalid Handler for type: %v", m.Handler) + if subID != nil { + msg = fmt.Sprintf("Invalid Handler for type: %v", *subID) + } + gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: msg})) return } _, _ = c.inStream.LoadOrCompute(m.MuxID, func() *muxServer { @@ -1338,7 +1342,11 @@ func (c *Connection) handleConnectMux(ctx context.Context, m message, subID *sub handler = c.handlers.subStreams[*subID] } if handler == nil { - gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"})) + msg := fmt.Sprintf("Invalid Handler for type: %v", m.Handler) + if subID != nil { + msg = fmt.Sprintf("Invalid Handler for type: %v", *subID) + } + gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: msg})) return } @@ -1392,7 +1400,11 @@ func (c *Connection) handleRequest(ctx context.Context, m message, subID *subHan handler = c.handlers.subSingle[*subID] } if handler == nil { - gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: "Invalid Handler for type"})) + msg := fmt.Sprintf("Invalid Handler for type: %v", m.Handler) + if subID != nil { + msg = fmt.Sprintf("Invalid Handler for type: %v", *subID) + } + gridLogIf(ctx, c.queueMsg(m, muxConnectError{Error: msg})) return } diff --git a/internal/grid/debug.go b/internal/grid/debug.go index c6c334198..a6b3e2606 100644 --- a/internal/grid/debug.go +++ b/internal/grid/debug.go @@ -26,7 +26,6 @@ import ( "sync" "time" - xioutil "github.com/minio/minio/internal/ioutil" "github.com/minio/mux" ) @@ -90,6 +89,7 @@ func SetupTestGrid(n int) (*TestGrid, error) { AuthFn: dummyNewToken, AuthToken: dummyTokenValidate, BlockConnect: ready, + RoutePath: RoutePath, }) if err != nil { return nil, err @@ -101,7 +101,7 @@ func SetupTestGrid(n int) (*TestGrid, error) { res.Listeners = append(res.Listeners, listeners[i]) res.Mux = append(res.Mux, m) } - xioutil.SafeClose(ready) + close(ready) for _, m := range res.Managers { for _, remote := range m.Targets() { if err := m.Connection(remote).WaitForConnect(ctx); err != nil { diff --git a/internal/grid/grid.go b/internal/grid/grid.go index 51bc57afd..0b192318e 100644 --- a/internal/grid/grid.go +++ b/internal/grid/grid.go @@ -202,13 +202,12 @@ func bytesOrLength(b []byte) string { // The net.Conn must support all features as described by the net.Conn interface. type ConnDialer func(ctx context.Context, address string) (net.Conn, error) -// ConnectWS returns a function that dials a websocket connection to the given address. -// Route and auth are added to the connection. -func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx context.Context, remote string) (net.Conn, error) { +// ConnectWSWithRoutePath is like ConnectWS but with a custom grid route path. +func ConnectWSWithRoutePath(dial ContextDialer, auth AuthFn, tls *tls.Config, routePath string) func(ctx context.Context, remote string) (net.Conn, error) { return func(ctx context.Context, remote string) (net.Conn, error) { toDial := strings.Replace(remote, "http://", "ws://", 1) toDial = strings.Replace(toDial, "https://", "wss://", 1) - toDial += RoutePath + toDial += routePath dialer := ws.DefaultDialer dialer.ReadBufferSize = readBufferSize @@ -234,5 +233,11 @@ func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx contex } } +// ConnectWS returns a function that dials a websocket connection to the given address. +// Route and auth are added to the connection. +func ConnectWS(dial ContextDialer, auth AuthFn, tls *tls.Config) func(ctx context.Context, remote string) (net.Conn, error) { + return ConnectWSWithRoutePath(dial, auth, tls, RoutePath) +} + // ValidateTokenFn must validate the token and return an error if it is invalid. type ValidateTokenFn func(token string) error diff --git a/internal/grid/grid_test.go b/internal/grid/grid_test.go index 35850c039..2a495767a 100644 --- a/internal/grid/grid_test.go +++ b/internal/grid/grid_test.go @@ -166,7 +166,7 @@ func TestSingleRoundtripNotReady(t *testing.T) { const testPayload = "Hello Grid World!" // Single requests should have remote errors. _, err := remoteConn.Request(context.Background(), handlerTest, []byte(testPayload)) - if v, ok := err.(*RemoteErr); !ok || v.Error() != "Invalid Handler for type" { + if _, ok := err.(*RemoteErr); !ok { t.Fatalf("Unexpected error: %v, %T", err, err) } // Streams should not be able to set up until registered. diff --git a/internal/grid/manager.go b/internal/grid/manager.go index 89ae20090..16dcf272a 100644 --- a/internal/grid/manager.go +++ b/internal/grid/manager.go @@ -45,6 +45,9 @@ const ( // RoutePath is the remote path to connect to. RoutePath = "/minio/grid/" + apiVersion + + // RouteLockPath is the remote lock path to connect to. + RouteLockPath = "/minio/grid/lock/" + apiVersion ) // Manager will contain all the connections to the grid. @@ -65,6 +68,9 @@ type Manager struct { // authToken is a function that will validate a token. authToken ValidateTokenFn + + // routePath indicates the dial route path + routePath string } // ManagerOptions are options for creating a new grid manager. @@ -74,6 +80,7 @@ type ManagerOptions struct { Incoming func(n int64) // Record incoming bytes. Outgoing func(n int64) // Record outgoing bytes. BlockConnect chan struct{} // If set, incoming and outgoing connections will be blocked until closed. + RoutePath string TraceTo *pubsub.PubSub[madmin.TraceInfo, madmin.TraceType] Dialer ConnDialer // Sign a token for the given audience. @@ -99,6 +106,7 @@ func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error) { targets: make(map[string]*Connection, len(o.Hosts)), local: o.Local, authToken: o.AuthToken, + routePath: o.RoutePath, } m.handlers.init() if ctx == nil { @@ -137,7 +145,7 @@ func NewManager(ctx context.Context, o ManagerOptions) (*Manager, error) { // AddToMux will add the grid manager to the given mux. func (m *Manager) AddToMux(router *mux.Router, authReq func(r *http.Request) error) { - router.Handle(RoutePath, m.Handler(authReq)) + router.Handle(m.routePath, m.Handler(authReq)) } // Handler returns a handler that can be used to serve grid requests.