From c9349747ca184c4c9ca8d8d0e299acd5ca0c1614 Mon Sep 17 00:00:00 2001 From: Praveen raj Mani Date: Tue, 23 Jul 2019 23:07:25 +0530 Subject: [PATCH] Enable event-persistence in NATS and NATS-Streaming (#7612) --- cmd/admin-handlers_test.go | 2 + cmd/config-current.go | 4 +- cmd/config-current_test.go | 2 +- docs/bucket/notifications/README.md | 56 +++++++-- docs/config/config.sample.json | 2 + pkg/event/target/nats.go | 172 ++++++++++++++++++++++------ 6 files changed, 189 insertions(+), 49 deletions(-) diff --git a/cmd/admin-handlers_test.go b/cmd/admin-handlers_test.go index d460d09b3..36ceb96f7 100644 --- a/cmd/admin-handlers_test.go +++ b/cmd/admin-handlers_test.go @@ -152,6 +152,8 @@ var ( "token": "", "secure": false, "pingInterval": 0, + "queueDir": "", + "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", diff --git a/cmd/config-current.go b/cmd/config-current.go index 2b53d2c15..4c78ac420 100644 --- a/cmd/config-current.go +++ b/cmd/config-current.go @@ -358,7 +358,7 @@ func (s *serverConfig) TestNotificationTargets() error { if !v.Enable { continue } - t, err := target.NewNATSTarget(k, v) + t, err := target.NewNATSTarget(k, v, GlobalServiceDoneCh) if err != nil { return fmt.Errorf("nats(%s): %s", k, err.Error()) } @@ -710,7 +710,7 @@ func getNotificationTargets(config *serverConfig) *event.TargetList { for id, args := range config.Notify.NATS { if args.Enable { - newTarget, err := target.NewNATSTarget(id, args) + newTarget, err := target.NewNATSTarget(id, args, GlobalServiceDoneCh) if err != nil { logger.LogIf(context.Background(), err) continue diff --git a/cmd/config-current_test.go b/cmd/config-current_test.go index 3c18d669e..59cfb2843 100644 --- a/cmd/config-current_test.go +++ b/cmd/config-current_test.go @@ -188,7 +188,7 @@ func TestValidateConfig(t *testing.T) { {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "amqp": { "1": { "enable": true, "url": "", "exchange": "", "routingKey": "", "exchangeType": "", "mandatory": false, "immediate": false, "durable": false, "internal": false, "noWait": false, "autoDeleted": false }}}}`, false}, // Test 12 - Test NATS - {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false}, + {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "nats": { "1": { "enable": true, "address": "", "subject": "", "username": "", "password": "", "token": "", "secure": false, "pingInterval": 0, "queueDir": "", "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", "async": false, "maxPubAcksInflight": 0 } } }}}`, false}, // Test 13 - Test ElasticSearch {`{"version": "` + v + `", "credential": { "accessKey": "minio", "secretKey": "minio123" }, "region": "us-east-1", "browser": "on", "notify": { "elasticsearch": { "1": { "enable": true, "url": "", "index": "" } }}}`, false}, diff --git a/docs/bucket/notifications/README.md b/docs/bucket/notifications/README.md index b166681ed..4c3f123b7 100644 --- a/docs/bucket/notifications/README.md +++ b/docs/bucket/notifications/README.md @@ -547,11 +547,12 @@ The NATS configuration block in `config.json` is as follows: "password": "yoursecret", "token": "", "secure": false, - "pingInterval": 0 + "pingInterval": 0, + "queueDir": "", + "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", - "clientID": "", "async": false, "maxPubAcksInflight": 0 } @@ -559,6 +560,8 @@ The NATS configuration block in `config.json` is as follows: }, ``` +MinIO supports persistent event store. The persistent store will backup events when the NATS broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. + To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. ```sh @@ -571,7 +574,7 @@ After updating the NATS configuration in /tmp/myconfig , use `mc admin config se $ mc admin config set myminio < /tmp/myconfig ``` -MinIO server also supports [NATS Streaming mode](http://nats.io/documentation/streaming/nats-streaming-intro/) that offers additional functionality like `Message/event persistence`, `At-least-once-delivery`, and `Publisher rate limiting`. To configure MinIO server to send notifications to NATS Streaming server, update the MinIO server configuration file as follows: +MinIO server also supports [NATS Streaming mode](http://nats.io/documentation/streaming/nats-streaming-intro/) that offers additional functionality like `At-least-once-delivery`, and `Publisher rate limiting`. To configure MinIO server to send notifications to NATS Streaming server, update the MinIO server configuration file as follows: ``` "nats": { @@ -584,10 +587,11 @@ MinIO server also supports [NATS Streaming mode](http://nats.io/documentation/st "token": "", "secure": false, "pingInterval": 0, + "queueDir": "", + "queueLimit": 0, "streaming": { "enable": true, "clusterID": "test-cluster", - "clientID": "minio-client", "async": true, "maxPubAcksInflight": 10 } @@ -677,20 +681,47 @@ import ( ) func main() { - natsConnection, _ := stan.Connect("test-cluster", "test-client") - log.Println("Connected") + + var stanConnection stan.Conn + + subscribe := func() { + fmt.Printf("Subscribing to subject 'bucketevents'\n") + stanConnection.Subscribe("bucketevents", func(m *stan.Msg) { + + // Handle the message + fmt.Printf("Received a message: %s\n", string(m.Data)) + }) + } + + + stanConnection, _ = stan.Connect("test-cluster", "test-client", stan.NatsURL("nats://yourusername:yoursecret@0.0.0.0:4222"), stan.SetConnectionLostHandler(func(c stan.Conn, _ error) { + go func() { + for { + // Reconnect if the connection is lost. + if stanConnection == nil || stanConnection.NatsConn() == nil || !stanConnection.NatsConn().IsConnected() { + stanConnection, _ = stan.Connect("test-cluster", "test-client", stan.NatsURL("nats://yourusername:yoursecret@0.0.0.0:4222"), stan.SetConnectionLostHandler(func(c stan.Conn, _ error) { + if c.NatsConn() != nil { + c.NatsConn().Close() + } + _ = c.Close() + })) + if stanConnection != nil { + subscribe() + } + + } + } + + }() + })) // Subscribe to subject - log.Printf("Subscribing to subject 'bucketevents'\n") - natsConnection.Subscribe("bucketevents", func(m *stan.Msg) { - - // Handle the message - fmt.Printf("Received a message: %s\n", string(m.Data)) - }) + subscribe() // Keep the connection alive runtime.Goexit() } + ``` ``` @@ -957,6 +988,7 @@ The MinIO server configuration file is stored on the backend in json format. Upd } } ``` + MinIO supports persistent event store. The persistent store will backup events when the kafka broker goes offline and replays it when the broker comes back online. The event store can be configured by setting the directory path in `queueDir` field and the maximum limit of events in the queueDir in `queueLimit` field. For eg, the `queueDir` can be `/home/events` and `queueLimit` can be `1000`. By default, the `queueLimit` is set to 10000. To update the configuration, use `mc admin config get` command to get the current configuration file for the minio deployment in json format, and save it locally. diff --git a/docs/config/config.sample.json b/docs/config/config.sample.json index d43e8df2a..4f334f798 100644 --- a/docs/config/config.sample.json +++ b/docs/config/config.sample.json @@ -117,6 +117,8 @@ "token": "", "secure": false, "pingInterval": 0, + "queueDir": "", + "queueLimit": 0, "streaming": { "enable": false, "clusterID": "", diff --git a/pkg/event/target/nats.go b/pkg/event/target/nats.go index 32d8d1c34..65ebf0852 100644 --- a/pkg/event/target/nats.go +++ b/pkg/event/target/nats.go @@ -20,6 +20,8 @@ import ( "encoding/json" "errors" "net/url" + "os" + "path/filepath" "github.com/minio/minio/pkg/event" xnet "github.com/minio/minio/pkg/net" @@ -37,6 +39,8 @@ type NATSArgs struct { Token string `json:"token"` Secure bool `json:"secure"` PingInterval int64 `json:"pingInterval"` + QueueDir string `json:"queueDir"` + QueueLimit uint64 `json:"queueLimit"` Streaming struct { Enable bool `json:"enable"` ClusterID string `json:"clusterID"` @@ -65,15 +69,57 @@ func (n NATSArgs) Validate() error { } } + if n.QueueDir != "" { + if !filepath.IsAbs(n.QueueDir) { + return errors.New("queueDir path should be absolute") + } + } + if n.QueueLimit > 10000 { + return errors.New("queueLimit should not exceed 10000") + } + return nil } +// To obtain a nats connection from args. +func (n NATSArgs) connectNats() (*nats.Conn, error) { + options := nats.DefaultOptions + options.Url = "nats://" + n.Address.String() + options.User = n.Username + options.Password = n.Password + options.Token = n.Token + options.Secure = n.Secure + return options.Connect() +} + +// To obtain a streaming connection from args. +func (n NATSArgs) connectStan() (stan.Conn, error) { + scheme := "nats" + if n.Secure { + scheme = "tls" + } + addressURL := scheme + "://" + n.Username + ":" + n.Password + "@" + n.Address.String() + + clientID, err := getNewUUID() + if err != nil { + return nil, err + } + + connOpts := []stan.Option{stan.NatsURL(addressURL)} + if n.Streaming.MaxPubAcksInflight > 0 { + connOpts = append(connOpts, stan.MaxPubAcksInflight(n.Streaming.MaxPubAcksInflight)) + } + + return stan.Connect(n.Streaming.ClusterID, clientID, connOpts...) +} + // NATSTarget - NATS target. type NATSTarget struct { id event.TargetID args NATSArgs natsConn *nats.Conn stanConn stan.Conn + store Store } // ID - returns target ID. @@ -81,11 +127,24 @@ func (target *NATSTarget) ID() event.TargetID { return target.id } -// Save - Sends event directly without persisting. +// Save - saves the events to the store which will be replayed when the Nats connection is active. func (target *NATSTarget) Save(eventData event.Event) error { + if target.store != nil { + return target.store.Put(eventData) + } + if target.args.Streaming.Enable { + if !target.stanConn.NatsConn().IsConnected() { + return errNotConnected + } + } else { + if !target.natsConn.IsConnected() { + return errNotConnected + } + } return target.send(eventData) } +// send - sends an event to the Nats. func (target *NATSTarget) send(eventData event.Event) error { objectName, err := url.QueryUnescape(eventData.S3.Object.Key) if err != nil { @@ -107,18 +166,62 @@ func (target *NATSTarget) send(eventData event.Event) error { } else { err = target.natsConn.Publish(target.args.Subject, data) } - return err } -// Send - interface compatible method does no-op. +// Send - sends event to Nats. func (target *NATSTarget) Send(eventKey string) error { - return nil + var connErr error + + if target.args.Streaming.Enable { + if target.stanConn == nil || target.stanConn.NatsConn() == nil { + target.stanConn, connErr = target.args.connectStan() + } else { + if !target.stanConn.NatsConn().IsConnected() { + return errNotConnected + } + } + } else { + if target.natsConn == nil { + target.natsConn, connErr = target.args.connectNats() + } else { + if !target.natsConn.IsConnected() { + return errNotConnected + } + } + } + + if connErr != nil { + if connErr.Error() == nats.ErrNoServers.Error() { + return errNotConnected + } + return connErr + } + + eventData, eErr := target.store.Get(eventKey) + if eErr != nil { + // The last event key in a successful batch will be sent in the channel atmost once by the replayEvents() + // Such events will not exist and wouldve been already been sent successfully. + if os.IsNotExist(eErr) { + return nil + } + return eErr + } + + if err := target.send(eventData); err != nil { + return err + } + + return target.store.Del(eventKey) } // Close - closes underneath connections to NATS server. func (target *NATSTarget) Close() (err error) { if target.stanConn != nil { + // closing the streaming connection does not close the provided NATS connection. + if target.stanConn.NatsConn() != nil { + target.stanConn.NatsConn().Close() + } err = target.stanConn.Close() } @@ -130,47 +233,48 @@ func (target *NATSTarget) Close() (err error) { } // NewNATSTarget - creates new NATS target. -func NewNATSTarget(id string, args NATSArgs) (*NATSTarget, error) { +func NewNATSTarget(id string, args NATSArgs, doneCh <-chan struct{}) (*NATSTarget, error) { var natsConn *nats.Conn var stanConn stan.Conn - var clientID string + var err error - if args.Streaming.Enable { - scheme := "nats" - if args.Secure { - scheme = "tls" - } - addressURL := scheme + "://" + args.Username + ":" + args.Password + "@" + args.Address.String() + var store Store - clientID, err = getNewUUID() - if err != nil { + if args.QueueDir != "" { + queueDir := filepath.Join(args.QueueDir, storePrefix+"-nats-"+id) + store = NewQueueStore(queueDir, args.QueueLimit) + if oErr := store.Open(); oErr != nil { + return nil, oErr + } + } + + if args.Streaming.Enable { + stanConn, err = args.connectStan() + } else { + natsConn, err = args.connectNats() + } + + if err != nil { + if store == nil || err.Error() != nats.ErrNoServers.Error() { return nil, err } - - connOpts := []stan.Option{stan.NatsURL(addressURL)} - if args.Streaming.MaxPubAcksInflight > 0 { - connOpts = append(connOpts, stan.MaxPubAcksInflight(args.Streaming.MaxPubAcksInflight)) - } - - stanConn, err = stan.Connect(args.Streaming.ClusterID, clientID, connOpts...) - } else { - options := nats.DefaultOptions - options.Url = "nats://" + args.Address.String() - options.User = args.Username - options.Password = args.Password - options.Token = args.Token - options.Secure = args.Secure - natsConn, err = options.Connect() - } - if err != nil { - return nil, err } - return &NATSTarget{ + target := &NATSTarget{ id: event.TargetID{ID: id, Name: "nats"}, args: args, stanConn: stanConn, natsConn: natsConn, - }, nil + store: store, + } + + if target.store != nil { + // Replays the events from the store. + eventKeyCh := replayEvents(target.store, doneCh) + // Start replaying events from the store. + go sendEvents(target, eventKeyCh, doneCh) + } + + return target, nil }