diff --git a/cmd/admin-handlers.go b/cmd/admin-handlers.go index ff8a71106..0df658ad9 100644 --- a/cmd/admin-handlers.go +++ b/cmd/admin-handlers.go @@ -2572,7 +2572,7 @@ func assignPoolNumbers(servers []madmin.ServerProperties) { func fetchLambdaInfo() []map[string][]madmin.TargetIDStatus { lambdaMap := make(map[string][]madmin.TargetIDStatus) - for _, tgt := range globalNotifyTargetList.Targets() { + for _, tgt := range globalEventNotifier.Targets() { targetIDStatus := make(map[string]madmin.Status) active, _ := tgt.IsActive() targetID := tgt.ID() diff --git a/cmd/event-notification.go b/cmd/event-notification.go index 412f6045e..cba7e4ab2 100644 --- a/cmd/event-notification.go +++ b/cmd/event-notification.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "net/url" + "runtime" "strings" "sync" @@ -36,16 +37,14 @@ import ( type EventNotifier struct { sync.RWMutex targetList *event.TargetList - targetResCh chan event.TargetIDResult bucketRulesMap map[string]event.RulesMap } // NewEventNotifier - creates new event notification object. -func NewEventNotifier() *EventNotifier { +func NewEventNotifier(ctx context.Context) *EventNotifier { // targetList/bucketRulesMap/bucketRemoteTargetRulesMap are populated by NotificationSys.InitBucketTargets() return &EventNotifier{ - targetList: event.NewTargetList(), - targetResCh: make(chan event.TargetIDResult), + targetList: event.NewTargetList(ctx), bucketRulesMap: make(map[string]event.RulesMap), } } @@ -90,6 +89,11 @@ func (evnot *EventNotifier) set(bucket BucketInfo, meta BucketMetadata) { evnot.AddRulesMap(bucket.Name, config.ToRulesMap()) } +// Targets returns all the registered targets +func (evnot *EventNotifier) Targets() []event.Target { + return evnot.targetList.Targets() +} + // InitBucketTargets - initializes event notification system from notification.xml of all buckets. func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI ObjectLayer) error { if objAPI == nil { @@ -99,17 +103,7 @@ func (evnot *EventNotifier) InitBucketTargets(ctx context.Context, objAPI Object if err := evnot.targetList.Add(globalNotifyTargetList.Targets()...); err != nil { return err } - - go func() { - for res := range evnot.targetResCh { - if res.Err != nil { - reqInfo := &logger.ReqInfo{} - reqInfo.AppendTags("targetID", res.ID.String()) - logger.LogOnceIf(logger.SetReqInfo(GlobalContext, reqInfo), res.Err, res.ID.String()) - } - } - }() - + evnot.targetList = evnot.targetList.Init(runtime.GOMAXPROCS(0)) // TODO: make this configurable (y4m4) return nil } @@ -159,7 +153,7 @@ func (evnot *EventNotifier) Send(args eventArgs) { } // If MINIO_API_SYNC_EVENTS is set, send events synchronously. - evnot.targetList.Send(args.ToEvent(true), targetIDSet, evnot.targetResCh, globalAPIConfig.isSyncEventsEnabled()) + evnot.targetList.Send(args.ToEvent(true), targetIDSet, globalAPIConfig.isSyncEventsEnabled()) } type eventArgs struct { diff --git a/cmd/server-main.go b/cmd/server-main.go index 8bc9cf586..3b970b0ff 100644 --- a/cmd/server-main.go +++ b/cmd/server-main.go @@ -321,7 +321,7 @@ func initAllSubsystems(ctx context.Context) { globalNotificationSys = NewNotificationSys(globalEndpoints) // Create new notification system - globalEventNotifier = NewEventNotifier() + globalEventNotifier = NewEventNotifier(GlobalContext) // Create new bucket metadata system. if globalBucketMetadataSys == nil { diff --git a/internal/config/notify/parse.go b/internal/config/notify/parse.go index 34a8c6b7a..a30c2313d 100644 --- a/internal/config/notify/parse.go +++ b/internal/config/notify/parse.go @@ -249,7 +249,7 @@ func fetchSubSysTargets(ctx context.Context, cfg config.Config, subSys string, t // FetchEnabledTargets - Returns a set of configured TargetList func FetchEnabledTargets(ctx context.Context, cfg config.Config, transport *http.Transport) (_ *event.TargetList, err error) { - targetList := event.NewTargetList() + targetList := event.NewTargetList(ctx) for _, subSys := range config.NotifySubSystems.ToSlice() { targets, err := fetchSubSysTargets(ctx, cfg, subSys, transport) if err != nil { diff --git a/internal/event/config_test.go b/internal/event/config_test.go index 8bf6dddac..df0b2d9ba 100644 --- a/internal/event/config_test.go +++ b/internal/event/config_test.go @@ -18,6 +18,7 @@ package event import ( + "context" "encoding/xml" "reflect" "strings" @@ -251,9 +252,9 @@ func TestQueueValidate(t *testing.T) { panic(err) } - targetList1 := NewTargetList() + targetList1 := NewTargetList(context.Background()) - targetList2 := NewTargetList() + targetList2 := NewTargetList(context.Background()) if err := targetList2.Add(&ExampleTarget{TargetID{"1", "webhook"}, false, false}); err != nil { panic(err) } @@ -595,9 +596,9 @@ func TestConfigValidate(t *testing.T) { panic(err) } - targetList1 := NewTargetList() + targetList1 := NewTargetList(context.Background()) - targetList2 := NewTargetList() + targetList2 := NewTargetList(context.Background()) if err := targetList2.Add(&ExampleTarget{TargetID{"1", "webhook"}, false, false}); err != nil { panic(err) } @@ -927,9 +928,9 @@ func TestParseConfig(t *testing.T) { `) - targetList1 := NewTargetList() + targetList1 := NewTargetList(context.Background()) - targetList2 := NewTargetList() + targetList2 := NewTargetList(context.Background()) if err := targetList2.Add(&ExampleTarget{TargetID{"1", "webhook"}, false, false}); err != nil { panic(err) } diff --git a/internal/event/targetlist.go b/internal/event/targetlist.go index 8755f613f..3945676a5 100644 --- a/internal/event/targetlist.go +++ b/internal/event/targetlist.go @@ -18,17 +18,21 @@ package event import ( + "context" "fmt" + "runtime" "strings" "sync" "sync/atomic" + "github.com/minio/minio/internal/logger" "github.com/minio/minio/internal/store" + "github.com/minio/pkg/workers" ) const ( // The maximum allowed number of concurrent Send() calls to all configured notifications targets - maxConcurrentTargetSendCalls = 20000 + maxConcurrentAsyncSend = 50000 ) // Target - event target interface @@ -49,9 +53,10 @@ type TargetStore interface { // TargetStats is a collection of stats for multiple targets. type TargetStats struct { // CurrentSendCalls is the number of concurrent async Send calls to all targets - CurrentSendCalls int64 - TotalEvents int64 - EventsSkipped int64 + CurrentSendCalls int64 + TotalEvents int64 + EventsSkipped int64 + CurrentQueuedCalls int64 TargetStats map[string]TargetStat } @@ -65,12 +70,19 @@ type TargetStat struct { // TargetList - holds list of targets indexed by target ID. type TargetList struct { // The number of concurrent async Send calls to all targets - currentSendCalls int64 - totalEvents int64 - eventsSkipped int64 + currentSendCalls atomic.Int64 + totalEvents atomic.Int64 + eventsSkipped atomic.Int64 sync.RWMutex targets map[TargetID]Target + queue chan asyncEvent + ctx context.Context +} + +type asyncEvent struct { + ev Event + targetSet TargetIDSet } // Add - adds unique target to target list. @@ -150,6 +162,14 @@ func (list *TargetList) List() []TargetID { return keys } +func (list *TargetList) get(id TargetID) (Target, bool) { + list.RLock() + defer list.RUnlock() + + target, ok := list.targets[id] + return target, ok +} + // TargetMap - returns available targets. func (list *TargetList) TargetMap() map[TargetID]Target { list.RLock() @@ -163,46 +183,57 @@ func (list *TargetList) TargetMap() map[TargetID]Target { } // Send - sends events to targets identified by target IDs. -func (list *TargetList) Send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult, synchronous bool) { - if atomic.LoadInt64(&list.currentSendCalls) > maxConcurrentTargetSendCalls { - atomic.AddInt64(&list.eventsSkipped, 1) - err := fmt.Errorf("concurrent target notifications exceeded %d", maxConcurrentTargetSendCalls) - for id := range targetIDset { - resCh <- TargetIDResult{ID: id, Err: err} - } - return +func (list *TargetList) Send(event Event, targetIDset TargetIDSet, sync bool) { + if sync { + list.sendSync(event, targetIDset) + } else { + list.sendAsync(event, targetIDset) } - if synchronous { - list.send(event, targetIDset, resCh) - return - } - go list.send(event, targetIDset, resCh) } -func (list *TargetList) send(event Event, targetIDset TargetIDSet, resCh chan<- TargetIDResult) { +func (list *TargetList) sendSync(event Event, targetIDset TargetIDSet) { var wg sync.WaitGroup for id := range targetIDset { - list.RLock() - target, ok := list.targets[id] - list.RUnlock() - if ok { - wg.Add(1) - go func(id TargetID, target Target) { - atomic.AddInt64(&list.currentSendCalls, 1) - defer atomic.AddInt64(&list.currentSendCalls, -1) - defer wg.Done() - tgtRes := TargetIDResult{ID: id} - if err := target.Save(event); err != nil { - tgtRes.Err = err - } - resCh <- tgtRes - }(id, target) - } else { - resCh <- TargetIDResult{ID: id} + target, ok := list.get(id) + if !ok { + continue } + wg.Add(1) + go func(id TargetID, target Target) { + list.currentSendCalls.Add(1) + defer list.currentSendCalls.Add(-1) + defer wg.Done() + + if err := target.Save(event); err != nil { + reqInfo := &logger.ReqInfo{} + reqInfo.AppendTags("targetID", id.String()) + logger.LogOnceIf(logger.SetReqInfo(context.Background(), reqInfo), err, id.String()) + } + }(id, target) } wg.Wait() - atomic.AddInt64(&list.totalEvents, 1) + list.totalEvents.Add(1) +} + +func (list *TargetList) sendAsync(event Event, targetIDset TargetIDSet) { + select { + case list.queue <- asyncEvent{ + ev: event, + targetSet: targetIDset.Clone(), + }: + case <-list.ctx.Done(): + list.eventsSkipped.Add(int64(len(list.queue))) + return + default: + list.eventsSkipped.Add(1) + err := fmt.Errorf("concurrent target notifications exceeded %d, notification endpoint is too slow to accept events on incoming requests", maxConcurrentAsyncSend) + for id := range targetIDset { + reqInfo := &logger.ReqInfo{} + reqInfo.AppendTags("targetID", id.String()) + logger.LogOnceIf(logger.SetReqInfo(context.Background(), reqInfo), err, id.String()) + } + return + } } // Stats returns stats for targets. @@ -211,9 +242,10 @@ func (list *TargetList) Stats() TargetStats { if list == nil { return t } - t.CurrentSendCalls = atomic.LoadInt64(&list.currentSendCalls) - t.EventsSkipped = atomic.LoadInt64(&list.eventsSkipped) - t.TotalEvents = atomic.LoadInt64(&list.totalEvents) + t.CurrentSendCalls = list.currentSendCalls.Load() + t.EventsSkipped = list.eventsSkipped.Load() + t.TotalEvents = list.totalEvents.Load() + t.CurrentQueuedCalls = int64(len(list.queue)) list.RLock() defer list.RUnlock() @@ -228,7 +260,48 @@ func (list *TargetList) Stats() TargetStats { return t } -// NewTargetList - creates TargetList. -func NewTargetList() *TargetList { - return &TargetList{targets: make(map[TargetID]Target)} +func (list *TargetList) startSendWorkers(workerCount int) { + if workerCount == 0 { + workerCount = runtime.GOMAXPROCS(0) + } + wk, err := workers.New(workerCount) + if err != nil { + panic(err) + } + for i := 0; i < workerCount; i++ { + wk.Take() + go func() { + defer wk.Give() + + for { + select { + case av := <-list.queue: + list.sendSync(av.ev, av.targetSet) + case <-list.ctx.Done(): + return + } + } + }() + } + wk.Wait() +} + +var startOnce sync.Once + +// Init initialize target send workers. +func (list *TargetList) Init(workers int) *TargetList { + startOnce.Do(func() { + go list.startSendWorkers(workers) + }) + return list +} + +// NewTargetList - creates TargetList. +func NewTargetList(ctx context.Context) *TargetList { + list := &TargetList{ + targets: make(map[TargetID]Target), + queue: make(chan asyncEvent, maxConcurrentAsyncSend), + ctx: ctx, + } + return list } diff --git a/internal/event/targetlist_test.go b/internal/event/targetlist_test.go index c9c7edb16..51b9678b5 100644 --- a/internal/event/targetlist_test.go +++ b/internal/event/targetlist_test.go @@ -18,6 +18,7 @@ package event import ( + "context" "crypto/rand" "errors" "reflect" @@ -85,14 +86,14 @@ func (target ExampleTarget) FlushQueueStore() error { } func TestTargetListAdd(t *testing.T) { - targetListCase1 := NewTargetList() + targetListCase1 := NewTargetList(context.Background()) - targetListCase2 := NewTargetList() + targetListCase2 := NewTargetList(context.Background()) if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil { panic(err) } - targetListCase3 := NewTargetList() + targetListCase3 := NewTargetList(context.Background()) if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil { panic(err) } @@ -140,14 +141,14 @@ func TestTargetListAdd(t *testing.T) { } func TestTargetListExists(t *testing.T) { - targetListCase1 := NewTargetList() + targetListCase1 := NewTargetList(context.Background()) - targetListCase2 := NewTargetList() + targetListCase2 := NewTargetList(context.Background()) if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil { panic(err) } - targetListCase3 := NewTargetList() + targetListCase3 := NewTargetList(context.Background()) if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil { panic(err) } @@ -172,14 +173,14 @@ func TestTargetListExists(t *testing.T) { } func TestTargetListList(t *testing.T) { - targetListCase1 := NewTargetList() + targetListCase1 := NewTargetList(context.Background()) - targetListCase2 := NewTargetList() + targetListCase2 := NewTargetList(context.Background()) if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil { panic(err) } - targetListCase3 := NewTargetList() + targetListCase3 := NewTargetList(context.Background()) if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil { panic(err) } @@ -218,51 +219,8 @@ func TestTargetListList(t *testing.T) { } } -func TestTargetListSend(t *testing.T) { - targetListCase1 := NewTargetList() - - targetListCase2 := NewTargetList() - if err := targetListCase2.Add(&ExampleTarget{TargetID{"2", "testcase"}, false, false}); err != nil { - panic(err) - } - - targetListCase3 := NewTargetList() - if err := targetListCase3.Add(&ExampleTarget{TargetID{"3", "testcase"}, false, false}); err != nil { - panic(err) - } - - targetListCase4 := NewTargetList() - if err := targetListCase4.Add(&ExampleTarget{TargetID{"4", "testcase"}, true, false}); err != nil { - panic(err) - } - - testCases := []struct { - targetList *TargetList - targetID TargetID - expectErr bool - }{ - {targetListCase1, TargetID{"1", "webhook"}, false}, - {targetListCase2, TargetID{"1", "non-existent"}, false}, - {targetListCase3, TargetID{"3", "testcase"}, false}, - {targetListCase4, TargetID{"4", "testcase"}, true}, - } - - resCh := make(chan TargetIDResult) - for i, testCase := range testCases { - testCase.targetList.Send(Event{}, map[TargetID]struct{}{ - testCase.targetID: {}, - }, resCh, false) - res := <-resCh - expectErr := (res.Err != nil) - - if expectErr != testCase.expectErr { - t.Fatalf("test %v: error: expected: %v, got: %v", i+1, testCase.expectErr, expectErr) - } - } -} - func TestNewTargetList(t *testing.T) { - if result := NewTargetList(); result == nil { + if result := NewTargetList(context.Background()); result == nil { t.Fatalf("test: result: expected: , got: ") } }