From f9bbad1148db0300977cd666a76a9d5609c884b6 Mon Sep 17 00:00:00 2001 From: Julien Date: Fri, 27 Sep 2024 13:51:50 +0200 Subject: [PATCH 1/2] Limit the number of SSE Subscribers to 16 by default Signed-off-by: Julien --- cmd/prometheus/main.go | 52 ++++++++++--------- docs/command-line/prometheus.md | 1 + web/api/notifications.go | 25 +++++---- web/api/notifications_test.go | 47 ++++++++++++++--- web/api/v1/api.go | 10 ++-- .../src/components/NotificationsProvider.tsx | 3 +- web/web.go | 2 +- 7 files changed, 94 insertions(+), 46 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index dd068b86c5..f39eba3c31 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -135,24 +135,25 @@ func agentOnlyFlag(app *kingpin.Application, name, help string) *kingpin.FlagCla type flagConfig struct { configFile string - agentStoragePath string - serverStoragePath string - notifier notifier.Options - forGracePeriod model.Duration - outageTolerance model.Duration - resendDelay model.Duration - maxConcurrentEvals int64 - web web.Options - scrape scrape.Options - tsdb tsdbOptions - agent agentOptions - lookbackDelta model.Duration - webTimeout model.Duration - queryTimeout model.Duration - queryConcurrency int - queryMaxSamples int - RemoteFlushDeadline model.Duration - nameEscapingScheme string + agentStoragePath string + serverStoragePath string + notifier notifier.Options + forGracePeriod model.Duration + outageTolerance model.Duration + resendDelay model.Duration + maxConcurrentEvals int64 + web web.Options + scrape scrape.Options + tsdb tsdbOptions + agent agentOptions + lookbackDelta model.Duration + webTimeout model.Duration + queryTimeout model.Duration + queryConcurrency int + queryMaxSamples int + RemoteFlushDeadline model.Duration + nameEscapingScheme string + maxNotificationsSubscribers int enableAutoReload bool autoReloadInterval model.Duration @@ -274,17 +275,13 @@ func main() { ) } - notifs := api.NewNotifications(prometheus.DefaultRegisterer) - cfg := flagConfig{ notifier: notifier.Options{ Registerer: prometheus.DefaultRegisterer, }, web: web.Options{ - Registerer: prometheus.DefaultRegisterer, - Gatherer: prometheus.DefaultGatherer, - NotificationsSub: notifs.Sub, - NotificationsGetter: notifs.Get, + Registerer: prometheus.DefaultRegisterer, + Gatherer: prometheus.DefaultGatherer, }, promlogConfig: promlog.Config{}, } @@ -319,6 +316,9 @@ func main() { a.Flag("web.max-connections", "Maximum number of simultaneous connections across all listeners."). Default("512").IntVar(&cfg.web.MaxConnections) + a.Flag("web.max-notifications-subscribers", "Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close."). + Default("16").IntVar(&cfg.maxNotificationsSubscribers) + a.Flag("web.external-url", "The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically."). PlaceHolder("").StringVar(&cfg.prometheusURL) @@ -500,6 +500,10 @@ func main() { logger := promlog.New(&cfg.promlogConfig) + notifs := api.NewNotifications(cfg.maxNotificationsSubscribers, prometheus.DefaultRegisterer) + cfg.web.NotificationsSub = notifs.Sub + cfg.web.NotificationsGetter = notifs.Get + if err := cfg.setFeatureListOptions(logger); err != nil { fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err)) os.Exit(1) diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md index 7737b50210..eacb45ad07 100644 --- a/docs/command-line/prometheus.md +++ b/docs/command-line/prometheus.md @@ -21,6 +21,7 @@ The Prometheus monitoring server | --web.config.file | [EXPERIMENTAL] Path to configuration file that can enable TLS or authentication. | | | --web.read-timeout | Maximum duration before timing out read of the request, and closing idle connections. | `5m` | | --web.max-connections | Maximum number of simultaneous connections across all listeners. | `512` | +| --web.max-notifications-subscribers | Limits the maximum number of subscribers that can concurrently receive live notifications. If the limit is reached, new subscription requests will be denied until existing connections close. | `16` | | --web.external-url | The URL under which Prometheus is externally reachable (for example, if Prometheus is served via a reverse proxy). Used for generating relative and absolute links back to Prometheus itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Prometheus. If omitted, relevant URL components will be derived automatically. | | | --web.route-prefix | Prefix for the internal routes of web endpoints. Defaults to path of --web.external-url. | | | --web.user-assets | Path to static asset directory, available at /user. | | diff --git a/web/api/notifications.go b/web/api/notifications.go index 47f29f6ebe..976f0b0768 100644 --- a/web/api/notifications.go +++ b/web/api/notifications.go @@ -34,9 +34,10 @@ type Notification struct { // Notifications stores a list of Notification objects. // It also manages live subscribers that receive notifications via channels. type Notifications struct { - mu sync.Mutex - notifications []Notification - subscribers map[chan Notification]struct{} // Active subscribers. + mu sync.Mutex + notifications []Notification + subscribers map[chan Notification]struct{} // Active subscribers. + maxSubscribers int subscriberGauge prometheus.Gauge notificationsSent prometheus.Counter @@ -44,9 +45,10 @@ type Notifications struct { } // NewNotifications creates a new Notifications instance. -func NewNotifications(reg prometheus.Registerer) *Notifications { +func NewNotifications(maxSubscribers int, reg prometheus.Registerer) *Notifications { n := &Notifications{ - subscribers: make(map[chan Notification]struct{}), + subscribers: make(map[chan Notification]struct{}), + maxSubscribers: maxSubscribers, subscriberGauge: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "prometheus", Subsystem: "api", @@ -147,10 +149,16 @@ func (n *Notifications) Get() []Notification { // Sub allows a client to subscribe to live notifications. // It returns a channel where the subscriber will receive notifications and a function to unsubscribe. // Each subscriber has its own goroutine to handle notifications and prevent blocking. -func (n *Notifications) Sub() (<-chan Notification, func()) { +func (n *Notifications) Sub() (<-chan Notification, func(), bool) { + n.mu.Lock() + defer n.mu.Unlock() + + if len(n.subscribers) >= n.maxSubscribers { + return nil, nil, false + } + ch := make(chan Notification, 10) // Buffered channel to prevent blocking. - n.mu.Lock() // Add the new subscriber to the list. n.subscribers[ch] = struct{}{} n.subscriberGauge.Set(float64(len(n.subscribers))) @@ -159,7 +167,6 @@ func (n *Notifications) Sub() (<-chan Notification, func()) { for _, notification := range n.notifications { ch <- notification } - n.mu.Unlock() // Unsubscribe function to remove the channel from subscribers. unsubscribe := func() { @@ -172,5 +179,5 @@ func (n *Notifications) Sub() (<-chan Notification, func()) { n.subscriberGauge.Set(float64(len(n.subscribers))) } - return ch, unsubscribe + return ch, unsubscribe, true } diff --git a/web/api/notifications_test.go b/web/api/notifications_test.go index 7aa5961638..437ff1ec4b 100644 --- a/web/api/notifications_test.go +++ b/web/api/notifications_test.go @@ -23,7 +23,7 @@ import ( // TestNotificationLifecycle tests adding, modifying, and deleting notifications. func TestNotificationLifecycle(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Add a notification. notifs.AddNotification("Test Notification 1") @@ -47,10 +47,11 @@ func TestNotificationLifecycle(t *testing.T) { // TestSubscriberReceivesNotifications tests that a subscriber receives notifications, including modifications and deletions. func TestSubscriberReceivesNotifications(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe to notifications. - sub, unsubscribe := notifs.Sub() + sub, unsubscribe, ok := notifs.Sub() + require.True(t, ok) var wg sync.WaitGroup wg.Add(1) @@ -103,12 +104,14 @@ func TestSubscriberReceivesNotifications(t *testing.T) { // TestMultipleSubscribers tests that multiple subscribers receive notifications independently. func TestMultipleSubscribers(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe two subscribers to notifications. - sub1, unsubscribe1 := notifs.Sub() + sub1, unsubscribe1, ok1 := notifs.Sub() + require.True(t, ok1) - sub2, unsubscribe2 := notifs.Sub() + sub2, unsubscribe2, ok2 := notifs.Sub() + require.True(t, ok2) var wg sync.WaitGroup wg.Add(2) @@ -157,10 +160,11 @@ func TestMultipleSubscribers(t *testing.T) { // TestUnsubscribe tests that unsubscribing prevents further notifications from being received. func TestUnsubscribe(t *testing.T) { - notifs := NewNotifications(nil) + notifs := NewNotifications(10, nil) // Subscribe to notifications. - sub, unsubscribe := notifs.Sub() + sub, unsubscribe, ok := notifs.Sub() + require.True(t, ok) var wg sync.WaitGroup wg.Add(1) @@ -190,3 +194,30 @@ func TestUnsubscribe(t *testing.T) { require.Len(t, receivedNotifications, 1, "Expected 1 notification before unsubscribe.") require.Equal(t, "Test Notification 1", receivedNotifications[0].Text, "Unexpected notification text.") } + +// TestMaxSubscribers tests that exceeding the max subscribers limit prevents additional subscriptions. +func TestMaxSubscribers(t *testing.T) { + maxSubscribers := 2 + notifs := NewNotifications(maxSubscribers, nil) + + // Subscribe the maximum number of subscribers. + _, unsubscribe1, ok1 := notifs.Sub() + require.True(t, ok1, "Expected first subscription to succeed.") + + _, unsubscribe2, ok2 := notifs.Sub() + require.True(t, ok2, "Expected second subscription to succeed.") + + // Try to subscribe more than the max allowed. + _, _, ok3 := notifs.Sub() + require.False(t, ok3, "Expected third subscription to fail due to max subscriber limit.") + + // Unsubscribe one subscriber and try again. + unsubscribe1() + + _, unsubscribe4, ok4 := notifs.Sub() + require.True(t, ok4, "Expected subscription to succeed after unsubscribing a subscriber.") + + // Clean up the subscriptions. + unsubscribe2() + unsubscribe4() +} diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 5eadbdbe75..4589e14e02 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -215,7 +215,7 @@ type API struct { isAgent bool statsRenderer StatsRenderer notificationsGetter func() []api.Notification - notificationsSub func() (<-chan api.Notification, func()) + notificationsSub func() (<-chan api.Notification, func(), bool) remoteWriteHandler http.Handler remoteReadHandler http.Handler @@ -250,7 +250,7 @@ func NewAPI( runtimeInfo func() (RuntimeInfo, error), buildInfo *PrometheusVersion, notificationsGetter func() []api.Notification, - notificationsSub func() (<-chan api.Notification, func()), + notificationsSub func() (<-chan api.Notification, func(), bool), gatherer prometheus.Gatherer, registerer prometheus.Registerer, statsRenderer StatsRenderer, @@ -1690,7 +1690,11 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") // Subscribe to notifications. - notifications, unsubscribe := api.notificationsSub() + notifications, unsubscribe, ok := api.notificationsSub() + if !ok { + w.WriteHeader(http.StatusNoContent) + return + } defer unsubscribe() // Set up a flusher to push the response to the client. diff --git a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx index 73de54131e..44510061ed 100644 --- a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx +++ b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx @@ -42,7 +42,8 @@ export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ eventSource.onerror = () => { eventSource.close(); - setIsConnectionError(true); + // We do not call setIsConnectionError(true), we only set it to true if + // the fallback API does not work either. setShouldFetchFromAPI(true); }; diff --git a/web/web.go b/web/web.go index 87e4164c58..724ca91051 100644 --- a/web/web.go +++ b/web/web.go @@ -268,7 +268,7 @@ type Options struct { Notifier *notifier.Manager Version *PrometheusVersion NotificationsGetter func() []api.Notification - NotificationsSub func() (<-chan api.Notification, func()) + NotificationsSub func() (<-chan api.Notification, func(), bool) Flags map[string]string ListenAddresses []string From e34563bfe0ac78d81d3147aed9e03789945e1c74 Mon Sep 17 00:00:00 2001 From: Julien Date: Fri, 27 Sep 2024 15:58:41 +0200 Subject: [PATCH 2/2] Retry SSE connection unless max clients have been reached. This switches from the prehistoric EventSource API to the more modern fetch-event-source package. That packages gives us full control over the retries. It also gives us the opportunity to close the event source when the browser tab is hidden, saving resources. Signed-off-by: Julien --- web/api/v1/api.go | 4 ++ web/ui/mantine-ui/package.json | 1 + .../src/components/NotificationsProvider.tsx | 57 ++++++++++++------- web/ui/package-lock.json | 6 ++ 4 files changed, 48 insertions(+), 20 deletions(-) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 4589e14e02..d3cc7d718d 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -1704,6 +1704,10 @@ func (api *API) notificationsSSE(w http.ResponseWriter, r *http.Request) { return } + // Flush the response to ensure the headers are immediately and eventSource + // onopen is triggered client-side. + flusher.Flush() + for { select { case notification := <-notifications: diff --git a/web/ui/mantine-ui/package.json b/web/ui/mantine-ui/package.json index ec8ef89026..aae8ba99b1 100644 --- a/web/ui/mantine-ui/package.json +++ b/web/ui/mantine-ui/package.json @@ -25,6 +25,7 @@ "@mantine/dates": "^7.11.2", "@mantine/hooks": "^7.11.2", "@mantine/notifications": "^7.11.2", + "@microsoft/fetch-event-source": "^2.0.1", "@nexucis/fuzzy": "^0.5.1", "@nexucis/kvsearch": "^0.9.1", "@prometheus-io/codemirror-promql": "0.300.0-beta.0", diff --git a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx index 44510061ed..a331e524b0 100644 --- a/web/ui/mantine-ui/src/components/NotificationsProvider.tsx +++ b/web/ui/mantine-ui/src/components/NotificationsProvider.tsx @@ -3,6 +3,7 @@ import { useSettings } from '../state/settingsSlice'; import { NotificationsContext } from '../state/useNotifications'; import { Notification, NotificationsResult } from "../api/responseTypes/notifications"; import { useAPIQuery } from '../api/api'; +import { fetchEventSource } from '@microsoft/fetch-event-source'; export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ children }) => { const { pathPrefix } = useSettings(); @@ -24,31 +25,47 @@ export const NotificationsProvider: React.FC<{ children: React.ReactNode }> = ({ }, [data, isError]); useEffect(() => { - const eventSource = new EventSource(`${pathPrefix}/api/v1/notifications/live`); - - eventSource.onmessage = (event) => { - const notification: Notification = JSON.parse(event.data); - - setNotifications((prev: Notification[]) => { - const updatedNotifications = [...prev.filter((n: Notification) => n.text !== notification.text)]; - - if (notification.active) { - updatedNotifications.push(notification); + const controller = new AbortController(); + fetchEventSource(`${pathPrefix}/api/v1/notifications/live`, { + signal: controller.signal, + async onopen(response) { + if (response.ok) { + if (response.status === 200) { + setNotifications([]); + setIsConnectionError(false); + } else if (response.status === 204) { + controller.abort(); + setShouldFetchFromAPI(true); + } + } else { + setIsConnectionError(true); + throw new Error(`Unexpected response: ${response.status} ${response.statusText}`); } + }, + onmessage(event) { + const notification: Notification = JSON.parse(event.data); - return updatedNotifications; - }); - }; + setNotifications((prev: Notification[]) => { + const updatedNotifications = [...prev.filter((n: Notification) => n.text !== notification.text)]; - eventSource.onerror = () => { - eventSource.close(); - // We do not call setIsConnectionError(true), we only set it to true if - // the fallback API does not work either. - setShouldFetchFromAPI(true); - }; + if (notification.active) { + updatedNotifications.push(notification); + } + + return updatedNotifications; + }); + }, + onclose() { + throw new Error("Server closed the connection"); + }, + onerror() { + setIsConnectionError(true); + return 5000; + }, + }); return () => { - eventSource.close(); + controller.abort(); }; }, [pathPrefix]); diff --git a/web/ui/package-lock.json b/web/ui/package-lock.json index 2dc1fcdfe8..49a9074806 100644 --- a/web/ui/package-lock.json +++ b/web/ui/package-lock.json @@ -39,6 +39,7 @@ "@mantine/dates": "^7.11.2", "@mantine/hooks": "^7.11.2", "@mantine/notifications": "^7.11.2", + "@microsoft/fetch-event-source": "^2.0.1", "@nexucis/fuzzy": "^0.5.1", "@nexucis/kvsearch": "^0.9.1", "@prometheus-io/codemirror-promql": "0.300.0-beta.0", @@ -2255,6 +2256,11 @@ "react": "^18.2.0" } }, + "node_modules/@microsoft/fetch-event-source": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz", + "integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==" + }, "node_modules/@nexucis/fuzzy": { "version": "0.5.1", "resolved": "https://registry.npmjs.org/@nexucis/fuzzy/-/fuzzy-0.5.1.tgz",