diff --git a/changelog/22487.txt b/changelog/22487.txt new file mode 100644 index 0000000000..bc555f05d5 --- /dev/null +++ b/changelog/22487.txt @@ -0,0 +1,6 @@ +```release-note:change +events: `data_path` will include full data path of secret, including name. +``` +```release-note:change +sdk/logical/events: `EventSender` interface method is now `SendEvent` instead of `Send`. +``` diff --git a/http/events_test.go b/http/events_test.go index 7ffed44280..c1d961b5d4 100644 --- a/http/events_test.go +++ b/http/events_test.go @@ -62,7 +62,7 @@ func TestEventsSubscribe(t *testing.T) { pluginInfo := &logical.EventPluginInfo{ MountPath: "secret", } - err = core.Events().SendInternal(namespace.RootContext(context.Background()), namespace.RootNamespace, pluginInfo, logical.EventType(eventType), &logical.EventData{ + err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), namespace.RootNamespace, pluginInfo, logical.EventType(eventType), &logical.EventData{ Id: id, Metadata: nil, EntityIds: nil, diff --git a/sdk/framework/backend.go b/sdk/framework/backend.go index 0d05547d15..a16228eabb 100644 --- a/sdk/framework/backend.go +++ b/sdk/framework/backend.go @@ -730,11 +730,13 @@ func (b *Backend) handleWALRollback(ctx context.Context, req *logical.Request) ( return logical.ErrorResponse(merr.Error()), nil } +// SendEvent is used to send events through the underlying EventSender. +// It returns ErrNoEvents if the events system has not been configured or enabled. func (b *Backend) SendEvent(ctx context.Context, eventType logical.EventType, event *logical.EventData) error { if b.events == nil { return ErrNoEvents } - return b.events.Send(ctx, eventType, event) + return b.events.SendEvent(ctx, eventType, event) } // FieldSchema is a basic schema to describe the format of a path field. diff --git a/sdk/logical/events.go b/sdk/logical/events.go index cbd3f73690..d341f97499 100644 --- a/sdk/logical/events.go +++ b/sdk/logical/events.go @@ -7,6 +7,24 @@ import ( "context" "github.com/hashicorp/go-uuid" + "google.golang.org/protobuf/types/known/structpb" +) + +// common event metadata keys +const ( + // EventMetadataDataPath is used in event metadata to show the API path that can be used to fetch any underlying + // data. For example, the KV plugin would set this to `data/mysecret`. The event system will automatically prepend + // the plugin mount to this path, if present, so it would become `secret/data/mysecret`, for example. + // If this is an auth plugin event, this will additionally be prepended with `auth/`. + EventMetadataDataPath = "data_path" + // EventMetadataOperation is used in event metadata to express what operation was performed that generated the + // event, e.g., `read` or `write`. + EventMetadataOperation = "operation" + // EventMetadataModified is used in event metadata when the event attests that the underlying data has been modified + // and might need to be re-fetched (at the EventMetadataDataPath). + EventMetadataModified = "modified" + + extraMetadataArgument = "EXTRA_VALUE_AT_END" ) // ID is an alias to GetId() for CloudEvents compatibility. @@ -30,5 +48,22 @@ type EventType string // EventSender sends events to the common event bus. type EventSender interface { - Send(ctx context.Context, eventType EventType, event *EventData) error + SendEvent(ctx context.Context, eventType EventType, event *EventData) error +} + +// SendEvent is a convenience method for plugins events to an EventSender, converting the +// metadataPairs to the EventData structure. +func SendEvent(ctx context.Context, sender EventSender, eventType string, metadataPairs ...string) error { + ev, err := NewEvent() + if err != nil { + return err + } + ev.Metadata = &structpb.Struct{Fields: make(map[string]*structpb.Value, (len(metadataPairs)+1)/2)} + for i := 0; i < len(metadataPairs)-1; i += 2 { + ev.Metadata.Fields[metadataPairs[i]] = structpb.NewStringValue(metadataPairs[i+1]) + } + if len(metadataPairs)%2 != 0 { + ev.Metadata.Fields[extraMetadataArgument] = structpb.NewStringValue(metadataPairs[len(metadataPairs)-1]) + } + return sender.SendEvent(ctx, EventType(eventType), ev) } diff --git a/sdk/logical/events_test.go b/sdk/logical/events_test.go new file mode 100644 index 0000000000..eace63c27b --- /dev/null +++ b/sdk/logical/events_test.go @@ -0,0 +1,53 @@ +package logical + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +type fakeSender struct { + captured *EventData +} + +func (f *fakeSender) SendEvent(ctx context.Context, eventType EventType, event *EventData) error { + f.captured = event + return nil +} + +// TestSendEventWithOddParametersAddsExtraMetadata tests that an extra parameter is added to the metadata +// with a special key to note that it was extra. +func TestSendEventWithOddParametersAddsExtraMetadata(t *testing.T) { + sender := &fakeSender{} + // 0 or 2 arguments are okay + err := SendEvent(context.Background(), sender, "foo") + if err != nil { + t.Fatal(err) + } + m := sender.captured.Metadata.AsMap() + assert.NotContains(t, m, extraMetadataArgument) + err = SendEvent(context.Background(), sender, "foo", "bar", "baz") + if err != nil { + t.Fatal(err) + } + m = sender.captured.Metadata.AsMap() + assert.NotContains(t, m, extraMetadataArgument) + + // 1 or 3 arguments should give result in extraMetadataArgument in metadata + err = SendEvent(context.Background(), sender, "foo", "extra") + if err != nil { + t.Fatal(err) + } + m = sender.captured.Metadata.AsMap() + assert.Contains(t, m, extraMetadataArgument) + assert.Equal(t, "extra", m[extraMetadataArgument]) + + err = SendEvent(context.Background(), sender, "foo", "bar", "baz", "extra") + if err != nil { + t.Fatal(err) + } + m = sender.captured.Metadata.AsMap() + assert.Contains(t, m, extraMetadataArgument) + assert.Equal(t, "extra", m[extraMetadataArgument]) +} diff --git a/sdk/plugin/grpc_events.go b/sdk/plugin/grpc_events.go index 05d788c66c..3a4d50cc93 100644 --- a/sdk/plugin/grpc_events.go +++ b/sdk/plugin/grpc_events.go @@ -23,7 +23,7 @@ type GRPCEventsClient struct { var _ logical.EventSender = (*GRPCEventsClient)(nil) -func (s *GRPCEventsClient) Send(ctx context.Context, eventType logical.EventType, event *logical.EventData) error { +func (s *GRPCEventsClient) SendEvent(ctx context.Context, eventType logical.EventType, event *logical.EventData) error { _, err := s.client.SendEvent(ctx, &pb.SendEventRequest{ EventType: string(eventType), Event: event, @@ -41,7 +41,7 @@ func (s *GRPCEventsServer) SendEvent(ctx context.Context, req *pb.SendEventReque return &pb.Empty{}, nil } - err := s.impl.Send(ctx, logical.EventType(req.EventType), req.Event) + err := s.impl.SendEvent(ctx, logical.EventType(req.EventType), req.Event) if err != nil { return nil, err } diff --git a/vault/eventbus/bus.go b/vault/eventbus/bus.go index c83ef7b881..75d4691cd4 100644 --- a/vault/eventbus/bus.go +++ b/vault/eventbus/bus.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net/url" + "path" "strings" "sync" "sync/atomic" @@ -21,6 +22,7 @@ import ( "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" "github.com/ryanuber/go-glob" + "google.golang.org/protobuf/types/known/structpb" ) const ( @@ -35,6 +37,12 @@ var ( ErrNotStarted = errors.New("event broker has not been started") cloudEventsFormatterFilter *cloudevents.FormatterFilter subscriptions atomic.Int64 // keeps track of event subscription count in all event buses + + // these metadata fields will have the plugin mount path prepended to them + metadataPrependPathFields = []string{ + "path", + logical.EventMetadataDataPath, + } ) // EventBus contains the main logic of running an event broker for Vault. @@ -81,12 +89,31 @@ func (bus *EventBus) Start() { } } -// SendInternal sends an event to the event bus and routes it to all relevant subscribers. +// patchMountPath patches the event data's metadata "secret_path" field, if present, to include the mount path prepended. +func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo) *logical.EventData { + if pluginInfo == nil || pluginInfo.MountPath == "" || data.Metadata == nil { + return data + } + + for _, field := range metadataPrependPathFields { + if data.Metadata.Fields[field] != nil { + newPath := path.Join(pluginInfo.MountPath, data.Metadata.Fields[field].GetStringValue()) + if pluginInfo.MountClass == "auth" { + newPath = path.Join("auth", newPath) + } + data.Metadata.Fields[field] = structpb.NewStringValue(newPath) + } + } + + return data +} + +// SendEventInternal sends an event to the event bus and routes it to all relevant subscribers. // This function does *not* wait for all subscribers to acknowledge before returning. // This function is meant to be used by trusted internal code, so it can specify details like the namespace // and plugin info. Events from plugins should be routed through WithPlugin(), which will populate // the namespace and plugin info automatically. -func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error { +func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error { if ns == nil { return namespace.ErrNoNamespace } @@ -94,14 +121,14 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, return ErrNotStarted } eventReceived := &logical.EventReceived{ - Event: data, + Event: patchMountPath(data, pluginInfo), Namespace: ns.Path, EventType: string(eventType), PluginInfo: pluginInfo, } bus.logger.Info("Sending event", "event", eventReceived) - // We can't easily know when the Send is complete, so we can't call the cancel function. + // We can't easily know when the SendEvent is complete, so we can't call the cancel function. // But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long. ctx, _ = context.WithTimeout(ctx, bus.timeout) _, err := bus.broker.Send(ctx, eventTypeAll, eventReceived) @@ -126,10 +153,10 @@ func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logica }, nil } -// Send sends an event to the event bus and routes it to all relevant subscribers. +// SendEvent sends an event to the event bus and routes it to all relevant subscribers. // This function does *not* wait for all subscribers to acknowledge before returning. -func (bus *pluginEventBus) Send(ctx context.Context, eventType logical.EventType, data *logical.EventData) error { - return bus.bus.SendInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data) +func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.EventType, data *logical.EventData) error { + return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data) } func init() { diff --git a/vault/eventbus/bus_test.go b/vault/eventbus/bus_test.go index acc3a2f823..8546fb565d 100644 --- a/vault/eventbus/bus_test.go +++ b/vault/eventbus/bus_test.go @@ -5,6 +5,7 @@ package eventbus import ( "context" + "encoding/json" "fmt" "sync/atomic" "testing" @@ -14,6 +15,8 @@ import ( "github.com/hashicorp/go-secure-stdlib/strutil" "github.com/hashicorp/vault/helper/namespace" "github.com/hashicorp/vault/sdk/logical" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/structpb" ) // TestBusBasics tests that basic event sending and subscribing function. @@ -31,14 +34,14 @@ func TestBusBasics(t *testing.T) { t.Fatal(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) if err != ErrNotStarted { t.Errorf("Expected not started error but got: %v", err) } bus.Start() - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) if err != nil { t.Errorf("Expected no error sending: %v", err) } @@ -54,7 +57,7 @@ func TestBusBasics(t *testing.T) { t.Fatal(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) if err != nil { t.Error(err) } @@ -97,7 +100,7 @@ func TestNamespaceFiltering(t *testing.T) { t.Fatal(err) } - err = bus.SendInternal(ctx, &namespace.Namespace{ + err = bus.SendEventInternal(ctx, &namespace.Namespace{ ID: "abc", Path: "/abc", }, nil, eventType, event) @@ -113,7 +116,7 @@ func TestNamespaceFiltering(t *testing.T) { // okay } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) if err != nil { t.Error(err) } @@ -163,11 +166,11 @@ func TestBus2Subscriptions(t *testing.T) { t.Fatal(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType2, event2) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType2, event2) if err != nil { t.Error(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType1, event1) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType1, event1) if err != nil { t.Error(err) } @@ -255,7 +258,7 @@ func TestBusSubscriptionsCancel(t *testing.T) { if err != nil { t.Fatal(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) if err != nil { t.Error(err) } @@ -267,7 +270,7 @@ func TestBusSubscriptionsCancel(t *testing.T) { if err != nil { t.Fatal(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event) if err != nil { t.Error(err) } @@ -337,11 +340,11 @@ func TestBusWildcardSubscriptions(t *testing.T) { t.Fatal(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, barEventType, event2) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, barEventType, event2) if err != nil { t.Error(err) } - err = bus.SendInternal(ctx, namespace.RootNamespace, nil, fooEventType, event1) + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, fooEventType, event1) if err != nil { t.Error(err) } @@ -377,3 +380,124 @@ func TestBusWildcardSubscriptions(t *testing.T) { t.Error("Timeout waiting for event2") } } + +// TestDataPathIsPrependedWithMount tests that "data_path", if present in the +// metadata, is prepended with the plugin's mount. +func TestDataPathIsPrependedWithMount(t *testing.T) { + bus, err := NewEventBus(nil) + if err != nil { + t.Fatal(err) + } + ctx := context.Background() + + fooEventType := logical.EventType("kv/foo") + bus.Start() + + ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, "kv/*") + if err != nil { + t.Fatal(err) + } + defer cancel() + + event, err := logical.NewEvent() + if err != nil { + t.Fatal(err) + } + metadata := map[string]string{ + logical.EventMetadataDataPath: "my/secret/path", + "not_touched": "xyz", + } + metadataBytes, err := json.Marshal(metadata) + if err != nil { + t.Fatal(err) + } + event.Metadata = &structpb.Struct{} + if err := event.Metadata.UnmarshalJSON(metadataBytes); err != nil { + t.Fatal(err) + } + + // no plugin info means nothing should change + err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, fooEventType, event) + if err != nil { + t.Error(err) + } + + timeout := time.After(1 * time.Second) + select { + case message := <-ch: + metadata := message.Payload.(*logical.EventReceived).Event.Metadata.AsMap() + assert.Contains(t, metadata, "not_touched") + assert.Equal(t, "xyz", metadata["not_touched"]) + assert.Contains(t, metadata, "data_path") + assert.Equal(t, "my/secret/path", metadata["data_path"]) + case <-timeout: + t.Error("Timeout waiting for event") + } + + // send with a secrets plugin mounted + pluginInfo := logical.EventPluginInfo{ + MountClass: "secrets", + MountAccessor: "kv_abc", + MountPath: "secret/", + Plugin: "kv", + PluginVersion: "v1.13.1+builtin", + Version: "2", + } + err = bus.SendEventInternal(ctx, namespace.RootNamespace, &pluginInfo, fooEventType, event) + if err != nil { + t.Error(err) + } + + timeout = time.After(1 * time.Second) + select { + case message := <-ch: + metadata := message.Payload.(*logical.EventReceived).Event.Metadata.AsMap() + assert.Contains(t, metadata, "not_touched") + assert.Equal(t, "xyz", metadata["not_touched"]) + assert.Contains(t, metadata, "data_path") + assert.Equal(t, "secret/my/secret/path", metadata["data_path"]) + case <-timeout: + t.Error("Timeout waiting for event") + } + + // send with an auth plugin mounted + pluginInfo = logical.EventPluginInfo{ + MountClass: "auth", + MountAccessor: "kubernetes_abc", + MountPath: "kubernetes/", + Plugin: "vault-plugin-auth-kubernetes", + PluginVersion: "v1.13.1+builtin", + } + event, err = logical.NewEvent() + if err != nil { + t.Fatal(err) + } + metadata = map[string]string{ + logical.EventMetadataDataPath: "my/secret/path", + "not_touched": "xyz", + } + metadataBytes, err = json.Marshal(metadata) + if err != nil { + t.Fatal(err) + } + event.Metadata = &structpb.Struct{} + if err := event.Metadata.UnmarshalJSON(metadataBytes); err != nil { + t.Fatal(err) + } + err = bus.SendEventInternal(ctx, namespace.RootNamespace, &pluginInfo, fooEventType, event) + if err != nil { + t.Error(err) + } + + timeout = time.After(1 * time.Second) + select { + case message := <-ch: + metadata := message.Payload.(*logical.EventReceived).Event.Metadata.AsMap() + assert.Contains(t, metadata, "not_touched") + assert.Equal(t, "xyz", metadata["not_touched"]) + assert.Contains(t, metadata, "data_path") + assert.Equal(t, "auth/kubernetes/my/secret/path", metadata["data_path"]) + case <-timeout: + t.Error("Timeout waiting for event") + } +}