events: Add full api_path; rename Send (#22487)

Biggest change: we rename `Send` to `SendEvent` in `logical.EventSender`..
Initially we picked `Send` to match the underlying go-eventlogger
broker's `Send` method, and to avoid the stuttering of `events.SendEvent`.

However, I think it is more useful for the `logical.EventSender`
interface to use the method `SendEvent` so that, for example,
`framework.Backend` can implement it.

This is a relatively change now that should not affect anything
except the KV plugin, which is being fixed in another PR.

Another change: if the `secret_path` metadata is present, then
the plugin-aware `EventBus` will prepend it with the plugin mount.
This allows the `secret_path` to be the full path to any referenced
secret.

This change is also backwards compatible, since this field was not
present in the KV plugin. (It did use the slightly different `path`
field, which we can keep for now.)
This commit is contained in:
Christopher Swenson 2023-08-23 15:11:22 -07:00 committed by GitHub
parent 9b0540dfbe
commit 925702de10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 270 additions and 23 deletions

6
changelog/22487.txt Normal file
View File

@ -0,0 +1,6 @@
```release-note:change
events: `data_path` will include full data path of secret, including name.
```
```release-note:change
sdk/logical/events: `EventSender` interface method is now `SendEvent` instead of `Send`.
```

View File

@ -62,7 +62,7 @@ func TestEventsSubscribe(t *testing.T) {
pluginInfo := &logical.EventPluginInfo{
MountPath: "secret",
}
err = core.Events().SendInternal(namespace.RootContext(context.Background()), namespace.RootNamespace, pluginInfo, logical.EventType(eventType), &logical.EventData{
err = core.Events().SendEventInternal(namespace.RootContext(context.Background()), namespace.RootNamespace, pluginInfo, logical.EventType(eventType), &logical.EventData{
Id: id,
Metadata: nil,
EntityIds: nil,

View File

@ -730,11 +730,13 @@ func (b *Backend) handleWALRollback(ctx context.Context, req *logical.Request) (
return logical.ErrorResponse(merr.Error()), nil
}
// SendEvent is used to send events through the underlying EventSender.
// It returns ErrNoEvents if the events system has not been configured or enabled.
func (b *Backend) SendEvent(ctx context.Context, eventType logical.EventType, event *logical.EventData) error {
if b.events == nil {
return ErrNoEvents
}
return b.events.Send(ctx, eventType, event)
return b.events.SendEvent(ctx, eventType, event)
}
// FieldSchema is a basic schema to describe the format of a path field.

View File

@ -7,6 +7,24 @@ import (
"context"
"github.com/hashicorp/go-uuid"
"google.golang.org/protobuf/types/known/structpb"
)
// common event metadata keys
const (
// EventMetadataDataPath is used in event metadata to show the API path that can be used to fetch any underlying
// data. For example, the KV plugin would set this to `data/mysecret`. The event system will automatically prepend
// the plugin mount to this path, if present, so it would become `secret/data/mysecret`, for example.
// If this is an auth plugin event, this will additionally be prepended with `auth/`.
EventMetadataDataPath = "data_path"
// EventMetadataOperation is used in event metadata to express what operation was performed that generated the
// event, e.g., `read` or `write`.
EventMetadataOperation = "operation"
// EventMetadataModified is used in event metadata when the event attests that the underlying data has been modified
// and might need to be re-fetched (at the EventMetadataDataPath).
EventMetadataModified = "modified"
extraMetadataArgument = "EXTRA_VALUE_AT_END"
)
// ID is an alias to GetId() for CloudEvents compatibility.
@ -30,5 +48,22 @@ type EventType string
// EventSender sends events to the common event bus.
type EventSender interface {
Send(ctx context.Context, eventType EventType, event *EventData) error
SendEvent(ctx context.Context, eventType EventType, event *EventData) error
}
// SendEvent is a convenience method for plugins events to an EventSender, converting the
// metadataPairs to the EventData structure.
func SendEvent(ctx context.Context, sender EventSender, eventType string, metadataPairs ...string) error {
ev, err := NewEvent()
if err != nil {
return err
}
ev.Metadata = &structpb.Struct{Fields: make(map[string]*structpb.Value, (len(metadataPairs)+1)/2)}
for i := 0; i < len(metadataPairs)-1; i += 2 {
ev.Metadata.Fields[metadataPairs[i]] = structpb.NewStringValue(metadataPairs[i+1])
}
if len(metadataPairs)%2 != 0 {
ev.Metadata.Fields[extraMetadataArgument] = structpb.NewStringValue(metadataPairs[len(metadataPairs)-1])
}
return sender.SendEvent(ctx, EventType(eventType), ev)
}

View File

@ -0,0 +1,53 @@
package logical
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
)
type fakeSender struct {
captured *EventData
}
func (f *fakeSender) SendEvent(ctx context.Context, eventType EventType, event *EventData) error {
f.captured = event
return nil
}
// TestSendEventWithOddParametersAddsExtraMetadata tests that an extra parameter is added to the metadata
// with a special key to note that it was extra.
func TestSendEventWithOddParametersAddsExtraMetadata(t *testing.T) {
sender := &fakeSender{}
// 0 or 2 arguments are okay
err := SendEvent(context.Background(), sender, "foo")
if err != nil {
t.Fatal(err)
}
m := sender.captured.Metadata.AsMap()
assert.NotContains(t, m, extraMetadataArgument)
err = SendEvent(context.Background(), sender, "foo", "bar", "baz")
if err != nil {
t.Fatal(err)
}
m = sender.captured.Metadata.AsMap()
assert.NotContains(t, m, extraMetadataArgument)
// 1 or 3 arguments should give result in extraMetadataArgument in metadata
err = SendEvent(context.Background(), sender, "foo", "extra")
if err != nil {
t.Fatal(err)
}
m = sender.captured.Metadata.AsMap()
assert.Contains(t, m, extraMetadataArgument)
assert.Equal(t, "extra", m[extraMetadataArgument])
err = SendEvent(context.Background(), sender, "foo", "bar", "baz", "extra")
if err != nil {
t.Fatal(err)
}
m = sender.captured.Metadata.AsMap()
assert.Contains(t, m, extraMetadataArgument)
assert.Equal(t, "extra", m[extraMetadataArgument])
}

View File

@ -23,7 +23,7 @@ type GRPCEventsClient struct {
var _ logical.EventSender = (*GRPCEventsClient)(nil)
func (s *GRPCEventsClient) Send(ctx context.Context, eventType logical.EventType, event *logical.EventData) error {
func (s *GRPCEventsClient) SendEvent(ctx context.Context, eventType logical.EventType, event *logical.EventData) error {
_, err := s.client.SendEvent(ctx, &pb.SendEventRequest{
EventType: string(eventType),
Event: event,
@ -41,7 +41,7 @@ func (s *GRPCEventsServer) SendEvent(ctx context.Context, req *pb.SendEventReque
return &pb.Empty{}, nil
}
err := s.impl.Send(ctx, logical.EventType(req.EventType), req.Event)
err := s.impl.SendEvent(ctx, logical.EventType(req.EventType), req.Event)
if err != nil {
return nil, err
}

View File

@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/url"
"path"
"strings"
"sync"
"sync/atomic"
@ -21,6 +22,7 @@ import (
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
"github.com/ryanuber/go-glob"
"google.golang.org/protobuf/types/known/structpb"
)
const (
@ -35,6 +37,12 @@ var (
ErrNotStarted = errors.New("event broker has not been started")
cloudEventsFormatterFilter *cloudevents.FormatterFilter
subscriptions atomic.Int64 // keeps track of event subscription count in all event buses
// these metadata fields will have the plugin mount path prepended to them
metadataPrependPathFields = []string{
"path",
logical.EventMetadataDataPath,
}
)
// EventBus contains the main logic of running an event broker for Vault.
@ -81,12 +89,31 @@ func (bus *EventBus) Start() {
}
}
// SendInternal sends an event to the event bus and routes it to all relevant subscribers.
// patchMountPath patches the event data's metadata "secret_path" field, if present, to include the mount path prepended.
func patchMountPath(data *logical.EventData, pluginInfo *logical.EventPluginInfo) *logical.EventData {
if pluginInfo == nil || pluginInfo.MountPath == "" || data.Metadata == nil {
return data
}
for _, field := range metadataPrependPathFields {
if data.Metadata.Fields[field] != nil {
newPath := path.Join(pluginInfo.MountPath, data.Metadata.Fields[field].GetStringValue())
if pluginInfo.MountClass == "auth" {
newPath = path.Join("auth", newPath)
}
data.Metadata.Fields[field] = structpb.NewStringValue(newPath)
}
}
return data
}
// SendEventInternal sends an event to the event bus and routes it to all relevant subscribers.
// This function does *not* wait for all subscribers to acknowledge before returning.
// This function is meant to be used by trusted internal code, so it can specify details like the namespace
// and plugin info. Events from plugins should be routed through WithPlugin(), which will populate
// the namespace and plugin info automatically.
func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
func (bus *EventBus) SendEventInternal(ctx context.Context, ns *namespace.Namespace, pluginInfo *logical.EventPluginInfo, eventType logical.EventType, data *logical.EventData) error {
if ns == nil {
return namespace.ErrNoNamespace
}
@ -94,14 +121,14 @@ func (bus *EventBus) SendInternal(ctx context.Context, ns *namespace.Namespace,
return ErrNotStarted
}
eventReceived := &logical.EventReceived{
Event: data,
Event: patchMountPath(data, pluginInfo),
Namespace: ns.Path,
EventType: string(eventType),
PluginInfo: pluginInfo,
}
bus.logger.Info("Sending event", "event", eventReceived)
// We can't easily know when the Send is complete, so we can't call the cancel function.
// We can't easily know when the SendEvent is complete, so we can't call the cancel function.
// But, it is called automatically after bus.timeout, so there won't be any leak as long as bus.timeout is not too long.
ctx, _ = context.WithTimeout(ctx, bus.timeout)
_, err := bus.broker.Send(ctx, eventTypeAll, eventReceived)
@ -126,10 +153,10 @@ func (bus *EventBus) WithPlugin(ns *namespace.Namespace, eventPluginInfo *logica
}, nil
}
// Send sends an event to the event bus and routes it to all relevant subscribers.
// SendEvent sends an event to the event bus and routes it to all relevant subscribers.
// This function does *not* wait for all subscribers to acknowledge before returning.
func (bus *pluginEventBus) Send(ctx context.Context, eventType logical.EventType, data *logical.EventData) error {
return bus.bus.SendInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
func (bus *pluginEventBus) SendEvent(ctx context.Context, eventType logical.EventType, data *logical.EventData) error {
return bus.bus.SendEventInternal(ctx, bus.namespace, bus.pluginInfo, eventType, data)
}
func init() {

View File

@ -5,6 +5,7 @@ package eventbus
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"testing"
@ -14,6 +15,8 @@ import (
"github.com/hashicorp/go-secure-stdlib/strutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/logical"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/types/known/structpb"
)
// TestBusBasics tests that basic event sending and subscribing function.
@ -31,14 +34,14 @@ func TestBusBasics(t *testing.T) {
t.Fatal(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != ErrNotStarted {
t.Errorf("Expected not started error but got: %v", err)
}
bus.Start()
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Errorf("Expected no error sending: %v", err)
}
@ -54,7 +57,7 @@ func TestBusBasics(t *testing.T) {
t.Fatal(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Error(err)
}
@ -97,7 +100,7 @@ func TestNamespaceFiltering(t *testing.T) {
t.Fatal(err)
}
err = bus.SendInternal(ctx, &namespace.Namespace{
err = bus.SendEventInternal(ctx, &namespace.Namespace{
ID: "abc",
Path: "/abc",
}, nil, eventType, event)
@ -113,7 +116,7 @@ func TestNamespaceFiltering(t *testing.T) {
// okay
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Error(err)
}
@ -163,11 +166,11 @@ func TestBus2Subscriptions(t *testing.T) {
t.Fatal(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType2, event2)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType2, event2)
if err != nil {
t.Error(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType1, event1)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType1, event1)
if err != nil {
t.Error(err)
}
@ -255,7 +258,7 @@ func TestBusSubscriptionsCancel(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Error(err)
}
@ -267,7 +270,7 @@ func TestBusSubscriptionsCancel(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, eventType, event)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, eventType, event)
if err != nil {
t.Error(err)
}
@ -337,11 +340,11 @@ func TestBusWildcardSubscriptions(t *testing.T) {
t.Fatal(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, barEventType, event2)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, barEventType, event2)
if err != nil {
t.Error(err)
}
err = bus.SendInternal(ctx, namespace.RootNamespace, nil, fooEventType, event1)
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, fooEventType, event1)
if err != nil {
t.Error(err)
}
@ -377,3 +380,124 @@ func TestBusWildcardSubscriptions(t *testing.T) {
t.Error("Timeout waiting for event2")
}
}
// TestDataPathIsPrependedWithMount tests that "data_path", if present in the
// metadata, is prepended with the plugin's mount.
func TestDataPathIsPrependedWithMount(t *testing.T) {
bus, err := NewEventBus(nil)
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
fooEventType := logical.EventType("kv/foo")
bus.Start()
ch, cancel, err := bus.Subscribe(ctx, namespace.RootNamespace, "kv/*")
if err != nil {
t.Fatal(err)
}
defer cancel()
event, err := logical.NewEvent()
if err != nil {
t.Fatal(err)
}
metadata := map[string]string{
logical.EventMetadataDataPath: "my/secret/path",
"not_touched": "xyz",
}
metadataBytes, err := json.Marshal(metadata)
if err != nil {
t.Fatal(err)
}
event.Metadata = &structpb.Struct{}
if err := event.Metadata.UnmarshalJSON(metadataBytes); err != nil {
t.Fatal(err)
}
// no plugin info means nothing should change
err = bus.SendEventInternal(ctx, namespace.RootNamespace, nil, fooEventType, event)
if err != nil {
t.Error(err)
}
timeout := time.After(1 * time.Second)
select {
case message := <-ch:
metadata := message.Payload.(*logical.EventReceived).Event.Metadata.AsMap()
assert.Contains(t, metadata, "not_touched")
assert.Equal(t, "xyz", metadata["not_touched"])
assert.Contains(t, metadata, "data_path")
assert.Equal(t, "my/secret/path", metadata["data_path"])
case <-timeout:
t.Error("Timeout waiting for event")
}
// send with a secrets plugin mounted
pluginInfo := logical.EventPluginInfo{
MountClass: "secrets",
MountAccessor: "kv_abc",
MountPath: "secret/",
Plugin: "kv",
PluginVersion: "v1.13.1+builtin",
Version: "2",
}
err = bus.SendEventInternal(ctx, namespace.RootNamespace, &pluginInfo, fooEventType, event)
if err != nil {
t.Error(err)
}
timeout = time.After(1 * time.Second)
select {
case message := <-ch:
metadata := message.Payload.(*logical.EventReceived).Event.Metadata.AsMap()
assert.Contains(t, metadata, "not_touched")
assert.Equal(t, "xyz", metadata["not_touched"])
assert.Contains(t, metadata, "data_path")
assert.Equal(t, "secret/my/secret/path", metadata["data_path"])
case <-timeout:
t.Error("Timeout waiting for event")
}
// send with an auth plugin mounted
pluginInfo = logical.EventPluginInfo{
MountClass: "auth",
MountAccessor: "kubernetes_abc",
MountPath: "kubernetes/",
Plugin: "vault-plugin-auth-kubernetes",
PluginVersion: "v1.13.1+builtin",
}
event, err = logical.NewEvent()
if err != nil {
t.Fatal(err)
}
metadata = map[string]string{
logical.EventMetadataDataPath: "my/secret/path",
"not_touched": "xyz",
}
metadataBytes, err = json.Marshal(metadata)
if err != nil {
t.Fatal(err)
}
event.Metadata = &structpb.Struct{}
if err := event.Metadata.UnmarshalJSON(metadataBytes); err != nil {
t.Fatal(err)
}
err = bus.SendEventInternal(ctx, namespace.RootNamespace, &pluginInfo, fooEventType, event)
if err != nil {
t.Error(err)
}
timeout = time.After(1 * time.Second)
select {
case message := <-ch:
metadata := message.Payload.(*logical.EventReceived).Event.Metadata.AsMap()
assert.Contains(t, metadata, "not_touched")
assert.Equal(t, "xyz", metadata["not_touched"])
assert.Contains(t, metadata, "data_path")
assert.Equal(t, "auth/kubernetes/my/secret/path", metadata["data_path"])
case <-timeout:
t.Error("Timeout waiting for event")
}
}