diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index 2d4d71c44..097939183 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -1409,9 +1409,15 @@ func (a adminAPIHandlers) TraceHandler(w http.ResponseWriter, r *http.Request) { peers, _ := newPeerRestClients(globalEndpoints) - globalTrace.Subscribe(traceCh, ctx.Done(), func(entry interface{}) bool { + traceFn := func(entry interface{}) bool { return mustTrace(entry, traceOpts) - }) + } + + err = globalTrace.Subscribe(traceCh, ctx.Done(), traceFn) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL) + return + } for _, peer := range peers { if peer == nil { @@ -1483,7 +1489,11 @@ func (a adminAPIHandlers) ConsoleLogHandler(w http.ResponseWriter, r *http.Reque peers, _ := newPeerRestClients(globalEndpoints) - globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil) + err = globalConsoleSys.Subscribe(logCh, ctx.Done(), node, limitLines, logKind, nil) + if err != nil { + writeErrorResponseJSON(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL) + return + } for _, peer := range peers { if peer == nil { diff --git a/cmd/api-router.go b/cmd/api-router.go index 537089e88..3fa6b05a0 100644 --- a/cmd/api-router.go +++ b/cmd/api-router.go @@ -342,7 +342,7 @@ func registerAPIRouter(router *mux.Router) { collectAPIStats("getbucketnotification", maxClients(gz(httpTraceAll(api.GetBucketNotificationHandler))))).Queries("notification", "") // ListenNotification router.Methods(http.MethodGet).HandlerFunc( - collectAPIStats("listennotification", maxClients(gz(httpTraceAll(api.ListenNotificationHandler))))).Queries("events", "{events:.*}") + collectAPIStats("listennotification", gz(httpTraceAll(api.ListenNotificationHandler)))).Queries("events", "{events:.*}") // ResetBucketReplicationStatus - MinIO extension API router.Methods(http.MethodGet).HandlerFunc( collectAPIStats("resetbucketreplicationstatus", maxClients(gz(httpTraceAll(api.ResetBucketReplicationStatusHandler))))).Queries("replication-reset-status", "") @@ -474,7 +474,7 @@ func registerAPIRouter(router *mux.Router) { // ListenNotification apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc( - collectAPIStats("listennotification", maxClients(gz(httpTraceAll(api.ListenNotificationHandler))))).Queries("events", "{events:.*}") + collectAPIStats("listennotification", gz(httpTraceAll(api.ListenNotificationHandler)))).Queries("events", "{events:.*}") // ListBuckets apiRouter.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc( diff --git a/cmd/consolelogger.go b/cmd/consolelogger.go index e1d1479b8..24b4e3624 100644 --- a/cmd/consolelogger.go +++ b/cmd/consolelogger.go @@ -45,7 +45,7 @@ type HTTPConsoleLoggerSys struct { // NewConsoleLogger - creates new HTTPConsoleLoggerSys with all nodes subscribed to // the console logging pub sub system func NewConsoleLogger(ctx context.Context) *HTTPConsoleLoggerSys { - ps := pubsub.New() + ps := pubsub.New(8) return &HTTPConsoleLoggerSys{ pubsub: ps, console: console.New(), @@ -75,7 +75,7 @@ func (sys *HTTPConsoleLoggerSys) HasLogListeners() bool { } // Subscribe starts console logging for this node. -func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) { +func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, node string, last int, logKind string, filter func(entry interface{}) bool) error { // Enable console logging for remote client. if !sys.HasLogListeners() { logger.AddSystemTarget(sys) @@ -111,11 +111,12 @@ func (sys *HTTPConsoleLoggerSys) Subscribe(subCh chan interface{}, doneCh <-chan select { case subCh <- entry: case <-doneCh: - return + return nil } } } - sys.pubsub.Subscribe(subCh, doneCh, filter) + + return sys.pubsub.Subscribe(subCh, doneCh, filter) } // Init if HTTPConsoleLoggerSys is valid, always returns nil right now diff --git a/cmd/globals.go b/cmd/globals.go index 37eeb23cd..9fa15020d 100644 --- a/cmd/globals.go +++ b/cmd/globals.go @@ -226,10 +226,10 @@ var ( // global Trace system to send HTTP request/response // and Storage/OS calls info to registered listeners. - globalTrace = pubsub.New() + globalTrace = pubsub.New(8) // global Listen system to send S3 API events to registered listeners - globalHTTPListen = pubsub.New() + globalHTTPListen = pubsub.New(0) // global console system to send console logs to // registered listeners diff --git a/cmd/listen-notification-handlers.go b/cmd/listen-notification-handlers.go index 3850bba18..1b7b301bb 100644 --- a/cmd/listen-notification-handlers.go +++ b/cmd/listen-notification-handlers.go @@ -127,7 +127,7 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r peers, _ := newPeerRestClients(globalEndpoints) - globalHTTPListen.Subscribe(listenCh, ctx.Done(), func(evI interface{}) bool { + listenFn := func(evI interface{}) bool { ev, ok := evI.(event.Event) if !ok { return false @@ -138,7 +138,13 @@ func (api objectAPIHandlers) ListenNotificationHandler(w http.ResponseWriter, r } } return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) - }) + } + + err := globalHTTPListen.Subscribe(listenCh, ctx.Done(), listenFn) + if err != nil { + writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrSlowDown), r.URL) + return + } if bucketName != "" { values.Set(peerRESTListenBucket, bucketName) diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index eb1b95bbb..ea9f01223 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -933,7 +933,7 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { // Use buffered channel to take care of burst sends or slow w.Write() ch := make(chan interface{}, 2000) - globalHTTPListen.Subscribe(ch, doneCh, func(evI interface{}) bool { + listenFn := func(evI interface{}) bool { ev, ok := evI.(event.Event) if !ok { return false @@ -944,7 +944,13 @@ func (s *peerRESTServer) ListenHandler(w http.ResponseWriter, r *http.Request) { } } return rulesMap.MatchSimple(ev.EventName, ev.S3.Object.Key) - }) + } + + err := globalHTTPListen.Subscribe(ch, doneCh, listenFn) + if err != nil { + s.writeErrorResponse(w, err) + return + } keepAliveTicker := time.NewTicker(500 * time.Millisecond) defer keepAliveTicker.Stop() @@ -1005,9 +1011,15 @@ func (s *peerRESTServer) TraceHandler(w http.ResponseWriter, r *http.Request) { // Use buffered channel to take care of burst sends or slow w.Write() ch := make(chan interface{}, 2000) - globalTrace.Subscribe(ch, doneCh, func(entry interface{}) bool { + traceFn := func(entry interface{}) bool { return mustTrace(entry, traceOpts) - }) + } + + err = globalTrace.Subscribe(ch, doneCh, traceFn) + if err != nil { + s.writeErrorResponse(w, err) + return + } keepAliveTicker := time.NewTicker(500 * time.Millisecond) defer keepAliveTicker.Stop() @@ -1092,7 +1104,11 @@ func (s *peerRESTServer) ConsoleLogHandler(w http.ResponseWriter, r *http.Reques defer close(doneCh) ch := make(chan interface{}, 2000) - globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil) + err := globalConsoleSys.Subscribe(ch, doneCh, "", 0, string(logger.All), nil) + if err != nil { + s.writeErrorResponse(w, err) + return + } keepAliveTicker := time.NewTicker(500 * time.Millisecond) defer keepAliveTicker.Stop() diff --git a/internal/pubsub/pubsub.go b/internal/pubsub/pubsub.go index ca3821877..866af2905 100644 --- a/internal/pubsub/pubsub.go +++ b/internal/pubsub/pubsub.go @@ -18,6 +18,7 @@ package pubsub import ( + "fmt" "sync" "sync/atomic" ) @@ -32,6 +33,7 @@ type Sub struct { type PubSub struct { subs []*Sub numSubscribers int32 + maxSubscribers int32 sync.RWMutex } @@ -53,13 +55,18 @@ func (ps *PubSub) Publish(item interface{}) { } // Subscribe - Adds a subscriber to pubsub system -func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) { +func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filter func(entry interface{}) bool) error { + totalSubs := atomic.AddInt32(&ps.numSubscribers, 1) + if ps.maxSubscribers > 0 && totalSubs > ps.maxSubscribers { + atomic.AddInt32(&ps.numSubscribers, -1) + return fmt.Errorf("the limit of `%d` subscribers is reached", ps.maxSubscribers) + } + ps.Lock() defer ps.Unlock() sub := &Sub{subCh, filter} ps.subs = append(ps.subs, sub) - atomic.AddInt32(&ps.numSubscribers, 1) go func() { <-doneCh @@ -74,6 +81,8 @@ func (ps *PubSub) Subscribe(subCh chan interface{}, doneCh <-chan struct{}, filt } atomic.AddInt32(&ps.numSubscribers, -1) }() + + return nil } // NumSubscribers returns the number of current subscribers @@ -81,7 +90,8 @@ func (ps *PubSub) NumSubscribers() int32 { return atomic.LoadInt32(&ps.numSubscribers) } -// New inits a PubSub system -func New() *PubSub { - return &PubSub{} +// New inits a PubSub system with a limit of maximum +// subscribers unless zero is specified +func New(maxSubscribers int32) *PubSub { + return &PubSub{maxSubscribers: maxSubscribers} } diff --git a/internal/pubsub/pubsub_test.go b/internal/pubsub/pubsub_test.go index 457d910a9..838d90ad0 100644 --- a/internal/pubsub/pubsub_test.go +++ b/internal/pubsub/pubsub_test.go @@ -24,68 +24,100 @@ import ( ) func TestSubscribe(t *testing.T) { - ps := New() + ps := New(2) ch1 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1) doneCh := make(chan struct{}) defer close(doneCh) - ps.Subscribe(ch1, doneCh, nil) - ps.Subscribe(ch2, doneCh, nil) + if err := ps.Subscribe(ch1, doneCh, nil); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := ps.Subscribe(ch2, doneCh, nil); err != nil { + t.Fatalf("unexpected error: %v", err) + } ps.Lock() defer ps.Unlock() if len(ps.subs) != 2 { - t.Errorf("expected 2 subscribers") + t.Fatalf("expected 2 subscribers") + } +} + +func TestSubscribeExceedingLimit(t *testing.T) { + ps := New(2) + ch1 := make(chan interface{}, 1) + ch2 := make(chan interface{}, 1) + ch3 := make(chan interface{}, 1) + doneCh := make(chan struct{}) + defer close(doneCh) + if err := ps.Subscribe(ch1, doneCh, nil); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := ps.Subscribe(ch2, doneCh, nil); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := ps.Subscribe(ch3, doneCh, nil); err == nil { + t.Fatalf("unexpected nil err") } } func TestUnsubscribe(t *testing.T) { - ps := New() + ps := New(2) ch1 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1) doneCh1 := make(chan struct{}) doneCh2 := make(chan struct{}) - ps.Subscribe(ch1, doneCh1, nil) - ps.Subscribe(ch2, doneCh2, nil) + if err := ps.Subscribe(ch1, doneCh1, nil); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := ps.Subscribe(ch2, doneCh2, nil); err != nil { + t.Fatalf("unexpected error: %v", err) + } close(doneCh1) // Allow for the above statement to take effect. time.Sleep(100 * time.Millisecond) ps.Lock() if len(ps.subs) != 1 { - t.Errorf("expected 1 subscriber") + t.Fatal("expected 1 subscriber") } ps.Unlock() close(doneCh2) } func TestPubSub(t *testing.T) { - ps := New() + ps := New(1) ch1 := make(chan interface{}, 1) doneCh1 := make(chan struct{}) defer close(doneCh1) - ps.Subscribe(ch1, doneCh1, func(entry interface{}) bool { return true }) + if err := ps.Subscribe(ch1, doneCh1, func(entry interface{}) bool { return true }); err != nil { + t.Fatalf("unexpected error: %v", err) + } val := "hello" ps.Publish(val) msg := <-ch1 if msg != "hello" { - t.Errorf(fmt.Sprintf("expected %s , found %s", val, msg)) + t.Fatalf(fmt.Sprintf("expected %s , found %s", val, msg)) } } func TestMultiPubSub(t *testing.T) { - ps := New() + ps := New(2) ch1 := make(chan interface{}, 1) ch2 := make(chan interface{}, 1) doneCh := make(chan struct{}) defer close(doneCh) - ps.Subscribe(ch1, doneCh, func(entry interface{}) bool { return true }) - ps.Subscribe(ch2, doneCh, func(entry interface{}) bool { return true }) + if err := ps.Subscribe(ch1, doneCh, func(entry interface{}) bool { return true }); err != nil { + t.Fatalf("unexpected error: %v", err) + } + if err := ps.Subscribe(ch2, doneCh, func(entry interface{}) bool { return true }); err != nil { + t.Fatalf("unexpected error: %v", err) + } val := "hello" ps.Publish(val) msg1 := <-ch1 msg2 := <-ch2 if msg1 != "hello" && msg2 != "hello" { - t.Errorf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2)) + t.Fatalf(fmt.Sprintf("expected both subscribers to have%s , found %s and %s", val, msg1, msg2)) } }