vault/sdk/logical/events.go
Vault Automation 6c52175b8d
events: send events from primary to secondary clusters (#8214) (#11451)
Collect event subscriber filters on the active node of a cluster as
"cluster wide" filters, and send them from the secondary active to the
primary active node (`SendSecondaryFilters rpc`). The primary active
node forwards events downstream to the secondary active node if the
events match the secondary cluster's subscriber filters
(`RecvPrimaryEvents rpc`). Then the events are further distributed
around the secondary cluster via the existing `RecvActiveNodeEvents`
and `SendStandbyFilters` rpc's.

Events are forwarded downstream to the secondary cluster if the mount
exists on the secondary cluster, i.e. events from mounts with
`local=true` aren't forwarded, and events from mounts that are not
replicated via paths-filter aren't forwarded.

(This is the CE portion of the above^^)

Co-authored-by: Theron Voran <tvoran@users.noreply.github.com>
2025-12-18 10:56:55 -08:00

117 lines
4.1 KiB
Go

// Copyright IBM Corp. 2016, 2025
// SPDX-License-Identifier: MPL-2.0
package logical
import (
"context"
"github.com/hashicorp/go-uuid"
"google.golang.org/protobuf/types/known/structpb"
)
// common event metadata keys
const (
// EventMetadataPath is used in event metadata to show the API path the client must have the `subscribe` capability
// on in order to consume the event. It is recommended that the event path metadata field is the API path that was
// invoked in order to generate the event.
//
// For example, the KV plugin would set this to `data/mysecret`. The event system will automatically prepend the
// plugin mount to this path, if present, so it would become `secret/data/mysecret`, for example.
// If this is an auth plugin event, this will additionally be prepended with `auth/`.
EventMetadataPath = "path"
// EventMetadataDataPath is used in event metadata to show the API path that can be used to fetch any underlying
// data. Similar to the `path` event metadata, the event system will automatically prepend the plugin mount to the
// `data_path`.
EventMetadataDataPath = "data_path"
// EventMetadataOperation is used in event metadata to express what operation was performed that generated the
// event, e.g., `read` or `write`.
EventMetadataOperation = "operation"
// EventMetadataModified is used in event metadata when the event attests that the underlying data has been modified
// and might need to be re-fetched (at the EventMetadataDataPath).
EventMetadataModified = "modified"
extraMetadataArgument = "EXTRA_VALUE_AT_END"
)
// ID is an alias to GetId() for CloudEvents compatibility.
func (x *EventReceived) ID() string {
return x.Event.GetId()
}
// NewEvent returns an event with a new, random EID.
func NewEvent() (*EventData, error) {
id, err := uuid.GenerateUUID()
if err != nil {
return nil, err
}
return &EventData{
Id: id,
}, nil
}
// EventType represents a topic, and is a wrapper around eventlogger.EventType.
type EventType string
// EventSender sends events to the common event bus.
type EventSender interface {
SendEvent(ctx context.Context, eventType EventType, event *EventData) error
}
// SendEvent is a convenience method for plugins events to an EventSender, converting the
// metadataPairs to the EventData structure.
func SendEvent(ctx context.Context, sender EventSender, eventType string, metadataPairs ...string) error {
ev, err := NewEvent()
if err != nil {
return err
}
ev.Metadata = &structpb.Struct{Fields: make(map[string]*structpb.Value, (len(metadataPairs)+1)/2)}
for i := 0; i < len(metadataPairs)-1; i += 2 {
ev.Metadata.Fields[metadataPairs[i]] = structpb.NewStringValue(metadataPairs[i+1])
}
if len(metadataPairs)%2 != 0 {
ev.Metadata.Fields[extraMetadataArgument] = structpb.NewStringValue(metadataPairs[len(metadataPairs)-1])
}
return sender.SendEvent(ctx, EventType(eventType), ev)
}
// EventReceivedBexpr is used for evaluating boolean expressions with go-bexpr.
type EventReceivedBexpr struct {
EventType string `bexpr:"event_type"`
Operation string `bexpr:"operation"`
SourcePluginMount string `bexpr:"source_plugin_mount"`
DataPath string `bexpr:"data_path"`
Namespace string `bexpr:"namespace"`
// SourcePluginIsLocal indicates whether the event was generated by a local
// mount plugin or not.
SourcePluginIsLocal bool `bexpr:"source_plugin_is_local"`
}
// BexprDatum returns a copy of EventReceived formatted for use in evaluating go-bexpr boolean expressions.
func (x *EventReceived) BexprDatum() any {
operation := ""
dataPath := ""
if x.Event != nil {
if x.Event.Metadata != nil {
operationValue := x.Event.Metadata.Fields[EventMetadataOperation]
if operationValue != nil {
operation = operationValue.GetStringValue()
}
dataPathValue := x.Event.Metadata.Fields[EventMetadataDataPath]
if dataPathValue != nil {
dataPath = dataPathValue.GetStringValue()
}
}
}
return &EventReceivedBexpr{
EventType: x.EventType,
Operation: operation,
SourcePluginMount: x.PluginInfo.MountPath,
DataPath: dataPath,
Namespace: x.Namespace,
SourcePluginIsLocal: x.PluginInfo.IsLocal,
}
}