From 3f329fe2d482370aea5ff302c5abca046de6931e Mon Sep 17 00:00:00 2001 From: Christopher Swenson Date: Tue, 17 Jan 2023 13:34:37 -0800 Subject: [PATCH] Add basic event bus broker stub (#18640) Creates a new `eventbus` package under `vault` with an implementation of the `go-eventlogger` broker. Also creates a stub of a common broker that will be accessible in the core, and creates a simple event sending interface. --- go.mod | 1 + go.sum | 7 ++ sdk/logical/events.go | 11 +++ vault/core.go | 7 ++ vault/eventbus/bus.go | 163 +++++++++++++++++++++++++++++++++++++ vault/eventbus/bus_test.go | 100 +++++++++++++++++++++++ 6 files changed, 289 insertions(+) create mode 100644 sdk/logical/events.go create mode 100644 vault/eventbus/bus.go create mode 100644 vault/eventbus/bus_test.go diff --git a/go.mod b/go.mod index ffdd9ed331..918e85ca3e 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/hashicorp/consul-template v0.29.5 github.com/hashicorp/consul/api v1.15.2 github.com/hashicorp/errwrap v1.1.0 + github.com/hashicorp/eventlogger v0.1.0 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-discover v0.0.0-20210818145131-c573d69da192 github.com/hashicorp/go-gcp-common v0.8.0 diff --git a/go.sum b/go.sum index 26ec28486f..2f3aba93db 100644 --- a/go.sum +++ b/go.sum @@ -745,6 +745,7 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= +github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= @@ -968,6 +969,8 @@ github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FK github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/eventlogger v0.1.0 h1:S6xc4gZVzewuDUP4R4Ngko419h/CGDuV/b4ADL3XLik= +github.com/hashicorp/eventlogger v0.1.0/go.mod h1:a3IXf1aEJfpCPzseTOrwKj4fVW/Qn3oEmpQeaIznzH0= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -1665,6 +1668,7 @@ github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= +github.com/rogpeppe/go-internal v1.6.2/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.1 h1:geMPLpDpQOgVyCg5z5GoRwLHepNdb71NXb67XFkP+Eg= github.com/rogpeppe/go-internal v1.8.1/go.mod h1:JeRgkft04UBgHMgCIwADu4Pn6Mtm5d4nPKWu0nJ5d+o= github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= @@ -1895,6 +1899,7 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= @@ -2351,6 +2356,7 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210101214203-2dba1e4ea05c/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= @@ -2696,6 +2702,7 @@ k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed h1:jAne/RjBTyawwAy0utX5eqigAwz/l k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= layeh.com/radius v0.0.0-20190322222518-890bc1058917 h1:BDXFaFzUt5EIqe/4wrTc4AcYZWP6iC6Ult+jQWLh5eU= layeh.com/radius v0.0.0-20190322222518-890bc1058917/go.mod h1:fywZKyu//X7iRzaxLgPWsvc0L26IUpVvE/aeIL2JtIQ= +mvdan.cc/gofumpt v0.1.1/go.mod h1:yXG1r1WqZVKWbVRtBWKWX9+CxGYfA51nSomhM0woR48= mvdan.cc/gofumpt v0.3.1 h1:avhhrOmv0IuvQVK7fvwV91oFSGAk5/6Po8GXTzICeu8= mvdan.cc/gofumpt v0.3.1/go.mod h1:w3ymliuxvzVx8DAutBnVyDqYb1Niy/yCJt/lk821YCE= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= diff --git a/sdk/logical/events.go b/sdk/logical/events.go new file mode 100644 index 0000000000..801d895546 --- /dev/null +++ b/sdk/logical/events.go @@ -0,0 +1,11 @@ +package logical + +import "context" + +// EventType represents a topic, and is a wrapper around eventlogger.EventType. +type EventType string + +// EventSender sends events to the common event bus. +type EventSender interface { + Send(ctx context.Context, eventType EventType, event any) error +} diff --git a/vault/core.go b/vault/core.go index e837ee98aa..89ed93434b 100644 --- a/vault/core.go +++ b/vault/core.go @@ -53,6 +53,7 @@ import ( sr "github.com/hashicorp/vault/serviceregistration" "github.com/hashicorp/vault/shamir" "github.com/hashicorp/vault/vault/cluster" + "github.com/hashicorp/vault/vault/eventbus" "github.com/hashicorp/vault/vault/quotas" vaultseal "github.com/hashicorp/vault/vault/seal" "github.com/hashicorp/vault/version" @@ -677,6 +678,8 @@ type Core struct { pendingRemovalMountsAllowed bool expirationRevokeRetryBase time.Duration + + events *eventbus.EventBus } func (c *Core) HAState() consts.HAState { @@ -3935,3 +3938,7 @@ func (c *Core) GetRaftAutopilotState(ctx context.Context) (*raft.AutopilotState, return raftBackend.GetAutopilotServerState(ctx) } + +func (c *Core) Events() *eventbus.EventBus { + return c.events +} diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go new file mode 100644 index 0000000000..290d8fd43f --- /dev/null +++ b/vault/eventbus/bus.go @@ -0,0 +1,163 @@ +package eventbus + +import ( + "context" + "errors" + "net/url" + "strings" + "sync/atomic" + + "github.com/hashicorp/eventlogger" + "github.com/hashicorp/eventlogger/formatter_filters/cloudevents" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-uuid" + "github.com/hashicorp/vault/sdk/logical" +) + +var ErrNotStarted = errors.New("event broker has not been started") + +var cloudEventsFormatterFilter *cloudevents.FormatterFilter + +// EventBus contains the main logic of running an event broker for Vault. +// Start() must be called before the EventBus will accept events for sending. +type EventBus struct { + logger hclog.Logger + broker *eventlogger.Broker + started atomic.Bool + formatterNodeID eventlogger.NodeID +} + +type asyncChanNode struct { + // TODO: add bounded deque buffer of *any + ch chan any +} + +var _ eventlogger.Node = &asyncChanNode{} + +func init() { + // TODO: maybe this should relate to the Vault core somehow? + sourceUrl, err := url.Parse("https://vaultproject.io/") + if err != nil { + panic(err) + } + cloudEventsFormatterFilter = &cloudevents.FormatterFilter{ + Source: sourceUrl, + Predicate: func(_ context.Context, e interface{}) (bool, error) { + return true, nil + }, + } +} + +func NewEventBus() (*EventBus, error) { + broker := eventlogger.NewBroker() + + formatterID, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + formatterNodeID := eventlogger.NodeID(formatterID) + err = broker.RegisterNode(formatterNodeID, cloudEventsFormatterFilter) + if err != nil { + return nil, err + } + + return &EventBus{ + logger: hclog.Default().Named("eventbus"), + broker: broker, + formatterNodeID: formatterNodeID, + }, nil +} + +// Start starts the event bus, allowing events to be written. +// It is not possible to stop or restart the event bus. +// It is safe to call Start() multiple times. +func (bus *EventBus) Start() { + bus.started.Store(true) +} + +var _ logical.EventSender = (*EventBus)(nil) + +// Send sends an event to the event bus and routes it to all relevant subscribers. +// This function does *not* wait for all subscribers to acknowledge before returning. +// TODO: use schema once it is defined +func (bus *EventBus) Send(ctx context.Context, eventType logical.EventType, s any) error { + if !bus.started.Load() { + return ErrNotStarted + } + bus.logger.Info("Sending event", "event", s) + _, err := bus.broker.Send(ctx, eventlogger.EventType(eventType), s) + if err != nil { + // if no listeners for this event type are registered, that's okay, the event + // will just not be sent anywhere + if strings.Contains(strings.ToLower(err.Error()), "no graph for eventtype") { + return nil + } + } + return err +} + +func (bus *EventBus) Subscribe(_ context.Context, eventType logical.EventType) (chan any, error) { + // subscriptions are still stored even if the bus has not been started + pipelineID, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + + nodeID, err := uuid.GenerateUUID() + if err != nil { + return nil, err + } + + // TODO: should we have just one node, and handle all the routing ourselves? + asyncNode := newAsyncNode() + err = bus.broker.RegisterNode(eventlogger.NodeID(nodeID), asyncNode) + if err != nil { + defer asyncNode.Close() + return nil, err + } + + nodes := []eventlogger.NodeID{bus.formatterNodeID, eventlogger.NodeID(nodeID)} + + pipeline := eventlogger.Pipeline{ + PipelineID: eventlogger.PipelineID(pipelineID), + EventType: eventlogger.EventType(eventType), + NodeIDs: nodes, + } + err = bus.broker.RegisterPipeline(pipeline) + if err != nil { + defer asyncNode.Close() + return nil, err + } + return asyncNode.ch, nil +} + +func newAsyncNode() *asyncChanNode { + return &asyncChanNode{ + ch: make(chan any), + } +} + +func (node *asyncChanNode) Close() error { + close(node.ch) + return nil +} + +func (node *asyncChanNode) Process(ctx context.Context, e *eventlogger.Event) (*eventlogger.Event, error) { + // TODO: add timeout on sending to node.ch + // sends to the channel async in another goroutine + go func() { + select { + case node.ch <- e.Payload: + case <-ctx.Done(): + } + }() + return e, nil +} + +func (node *asyncChanNode) Reopen() error { + return nil +} + +func (node *asyncChanNode) Type() eventlogger.NodeType { + return eventlogger.NodeTypeSink +} diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go new file mode 100644 index 0000000000..a1ecc9771e --- /dev/null +++ b/vault/eventbus/bus_test.go @@ -0,0 +1,100 @@ +package eventbus + +import ( + "context" + "testing" + "time" + + "github.com/hashicorp/vault/sdk/logical" +) + +func TestBusBasics(t *testing.T) { + bus, err := NewEventBus() + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + eventType := logical.EventType("someType") + + err = bus.Send(ctx, eventType, "message") + if err != ErrNotStarted { + t.Errorf("Expected not started error but got: %v", err) + } + + bus.Start() + + err = bus.Send(ctx, eventType, "sent but never received") + if err != nil { + t.Errorf("Expected no error sending: %v", err) + } + + ch, err := bus.Subscribe(ctx, eventType) + if err != nil { + t.Fatal(err) + } + + err = bus.Send(ctx, eventType, "message2") + if err != nil { + t.Error(err) + } + + timeout := time.After(1 * time.Second) + select { + case message := <-ch: + if message != "message2" { + t.Errorf("Got unexpected message: %v", message) + } + case <-timeout: + t.Error("Timeout waiting for message") + } +} + +func TestBus2Subscriptions(t *testing.T) { + bus, err := NewEventBus() + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + eventType1 := logical.EventType("someType1") + eventType2 := logical.EventType("someType2") + bus.Start() + + ch1, err := bus.Subscribe(ctx, eventType1) + if err != nil { + t.Fatal(err) + } + + ch2, err := bus.Subscribe(ctx, eventType2) + if err != nil { + t.Fatal(err) + } + + err = bus.Send(ctx, eventType2, "message2") + if err != nil { + t.Error(err) + } + err = bus.Send(ctx, eventType1, "message1") + if err != nil { + t.Error(err) + } + + timeout := time.After(1 * time.Second) + select { + case message := <-ch1: + if message != "message1" { + t.Errorf("Got unexpected message: %v", message) + } + case <-timeout: + t.Error("Timeout waiting for message1") + } + select { + case message := <-ch2: + if message != "message2" { + t.Errorf("Got unexpected message: %v", message) + } + case <-timeout: + t.Error("Timeout waiting for message2") + } +}