feat: add support for static extra fields for JSON logs

Fixes #7356

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov 2024-04-01 22:06:48 +04:00
parent 090143b030
commit 117e60583d
No known key found for this signature in database
GPG Key ID: FE042E3D4085A811
14 changed files with 492 additions and 17 deletions

View File

@ -177,6 +177,22 @@ kind: WatchdogTimerConfig
device: /dev/watchdog0
timeout: 3m0s
```
"""
[notes.logging]
title = "Logging"
description = """\
Talos Linux now supports setting extra tags when sending logs in JSON format:
```yaml
machine:
logging:
destinations:
- endpoint: "udp://127.0.0.1:12345/"
format: "json_lines"
extraTags:
server: s03-rack07
```
"""
[make_deps]

View File

@ -23,6 +23,8 @@ import (
networkutils "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/network/utils"
machinedruntime "github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime/logging"
"github.com/siderolabs/talos/pkg/machinery/config/config"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
"github.com/siderolabs/talos/pkg/machinery/resources/runtime"
)
@ -110,6 +112,22 @@ func (ctrl *KmsgLogDeliveryController) Run(ctx context.Context, r controller.Run
}
}
type logConfig struct {
endpoint *url.URL
}
func (c logConfig) Format() string {
return constants.LoggingFormatJSONLines
}
func (c logConfig) Endpoint() *url.URL {
return c.endpoint
}
func (c logConfig) ExtraTags() map[string]string {
return nil
}
//nolint:gocyclo
func (ctrl *KmsgLogDeliveryController) deliverLogs(ctx context.Context, r controller.Runtime, logger *zap.Logger, kmsgCh <-chan kmsg.Packet, destURLs []*url.URL) error {
if ctrl.drainSub == nil {
@ -117,7 +135,10 @@ func (ctrl *KmsgLogDeliveryController) deliverLogs(ctx context.Context, r contro
}
// initialize all log senders
senders := xslices.Map(destURLs, logging.NewJSONLines)
destLogConfigs := xslices.Map(destURLs, func(u *url.URL) config.LoggingDestination {
return logConfig{endpoint: u}
})
senders := xslices.Map(destLogConfigs, logging.NewJSONLines)
defer func() {
closeCtx, closeCtxCancel := context.WithTimeout(context.Background(), logCloseTimeout)

View File

@ -13,10 +13,12 @@ import (
"time"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/machinery/config/config"
)
type jsonLinesSender struct {
endpoint *url.URL
endpoint *url.URL
extraTags map[string]string
sema chan struct{}
conn net.Conn
@ -24,13 +26,15 @@ type jsonLinesSender struct {
// NewJSONLines returns log sender that sends logs in JSON over TCP (newline-delimited)
// or UDP (one message per packet).
func NewJSONLines(endpoint *url.URL) runtime.LogSender {
func NewJSONLines(cfg config.LoggingDestination) runtime.LogSender {
sema := make(chan struct{}, 1)
sema <- struct{}{}
return &jsonLinesSender{
endpoint: endpoint,
sema: sema,
endpoint: cfg.Endpoint(),
extraTags: cfg.ExtraTags(),
sema: sema,
}
}
@ -55,6 +59,10 @@ func (j *jsonLinesSender) marshalJSON(e *runtime.LogEvent) ([]byte, error) {
m["talos-time"] = e.Time.Format(time.RFC3339Nano)
m["talos-level"] = e.Level.String()
for k, v := range j.extraTags {
m[k] = v
}
return json.Marshal(m)
}

View File

@ -0,0 +1,336 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package logging_test
import (
"bufio"
"context"
"encoding/json"
"net"
"net/url"
"sync"
"testing"
"time"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/ensure"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime/logging"
"github.com/siderolabs/talos/pkg/machinery/constants"
)
func udpHandler(ctx context.Context, t *testing.T, conn net.PacketConn, sendCh chan<- []byte) {
t.Helper()
for {
select {
case <-ctx.Done():
return
default:
}
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
t.Logf("failed to set read deadline: %v", err)
return
}
buf := make([]byte, 1024)
n, _, err := conn.ReadFrom(buf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
t.Logf("failed to read from UDP connection: %v", err)
return
}
if !channel.SendWithContext(ctx, sendCh, buf[:n]) {
return
}
}
}
func tcpHandler(ctx context.Context, t *testing.T, conn net.Listener, sendCh chan<- []byte) {
t.Helper()
for {
select {
case <-ctx.Done():
return
default:
}
if err := conn.(*net.TCPListener).SetDeadline(time.Now().Add(10 * time.Millisecond)); err != nil {
t.Logf("failed to set accept deadline: %v", err)
return
}
c, err := conn.Accept()
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
t.Logf("failed to accept UDP connection: %v", err)
return
}
go func() {
defer c.Close() //nolint:errcheck
scanner := bufio.NewScanner(c)
for scanner.Scan() {
if !channel.SendWithContext(ctx, sendCh, scanner.Bytes()) {
return
}
}
}()
}
}
type loggingDestination struct {
endpoint *url.URL
extraTags map[string]string
}
func (l *loggingDestination) Endpoint() *url.URL {
return l.endpoint
}
func (l *loggingDestination) ExtraTags() map[string]string {
return l.extraTags
}
func (l *loggingDestination) Format() string {
return constants.LoggingFormatJSONLines
}
func TestSenderJSONLines(t *testing.T) { //nolint:tparallel
t.Parallel()
lisUDP, err := net.ListenPacket("udp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, lisUDP.Close())
})
lisTCP, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, lisTCP.Close())
})
udpEndpoint := lisUDP.LocalAddr().String()
tcpEndpoint := lisTCP.Addr().String()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
sendCh := make(chan []byte, 32)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
udpHandler(ctx, t, lisUDP, sendCh)
}()
wg.Add(1)
go func() {
defer wg.Done()
tcpHandler(ctx, t, lisTCP, sendCh)
}()
t.Cleanup(wg.Wait)
for _, test := range []struct {
name string
endpoint *url.URL
extraTags map[string]string
messages []*runtime.LogEvent
expected []map[string]any
}{
{
name: "UDP",
endpoint: ensure.Value(url.Parse("udp://" + udpEndpoint)),
messages: []*runtime.LogEvent{
{
Msg: "msg1",
Time: ensure.Value(time.Parse(time.RFC3339Nano, "2021-01-01T00:00:00Z")),
Level: zapcore.InfoLevel,
Fields: map[string]any{
"field1": "value1",
},
},
{
Msg: "msg2",
Time: ensure.Value(time.Parse(time.RFC3339Nano, "2021-01-01T00:00:01Z")),
Level: zapcore.DebugLevel,
},
},
expected: []map[string]any{
{
"field1": "value1",
"msg": "msg1",
"talos-level": "info",
"talos-time": "2021-01-01T00:00:00Z",
},
{
"msg": "msg2",
"talos-level": "debug",
"talos-time": "2021-01-01T00:00:01Z",
},
},
},
{
name: "UDP with extra tags",
endpoint: ensure.Value(url.Parse("udp://" + udpEndpoint)),
extraTags: map[string]string{
"extra1": "value1",
},
messages: []*runtime.LogEvent{
{
Msg: "msg1",
Time: ensure.Value(time.Parse(time.RFC3339Nano, "2021-01-01T00:00:00Z")),
Level: zapcore.InfoLevel,
Fields: map[string]any{
"field1": "value1",
},
},
{
Msg: "msg2",
Time: ensure.Value(time.Parse(time.RFC3339Nano, "2021-01-01T00:00:01Z")),
Level: zapcore.DebugLevel,
},
},
expected: []map[string]any{
{
"field1": "value1",
"extra1": "value1",
"msg": "msg1",
"talos-level": "info",
"talos-time": "2021-01-01T00:00:00Z",
},
{
"msg": "msg2",
"extra1": "value1",
"talos-level": "debug",
"talos-time": "2021-01-01T00:00:01Z",
},
},
},
{
name: "TCP",
endpoint: ensure.Value(url.Parse("tcp://" + tcpEndpoint)),
messages: []*runtime.LogEvent{
{
Msg: "hello",
Time: ensure.Value(time.Parse(time.RFC3339Nano, "2021-01-01T00:00:00Z")),
Level: zapcore.InfoLevel,
Fields: map[string]any{
"field1": "value1",
},
},
},
expected: []map[string]any{
{
"field1": "value1",
"msg": "hello",
"talos-level": "info",
"talos-time": "2021-01-01T00:00:00Z",
},
},
},
{
name: "TCP with extra tags",
endpoint: ensure.Value(url.Parse("tcp://" + tcpEndpoint)),
extraTags: map[string]string{
"extra1": "value1",
},
messages: []*runtime.LogEvent{
{
Msg: "hello",
Time: ensure.Value(time.Parse(time.RFC3339Nano, "2021-01-01T00:00:00Z")),
Level: zapcore.InfoLevel,
Fields: map[string]any{
"field1": "value1",
},
},
},
expected: []map[string]any{
{
"field1": "value1",
"extra1": "value1",
"msg": "hello",
"talos-level": "info",
"talos-time": "2021-01-01T00:00:00Z",
},
},
},
} {
t.Run(test.name, func(t *testing.T) {
// not parallel - need sequential execution
loggingCfg := &loggingDestination{
endpoint: test.endpoint,
extraTags: test.extraTags,
}
sender := logging.NewJSONLines(loggingCfg)
for _, msg := range test.messages {
require.NoError(t, sender.Send(ctx, msg))
}
for _, expected := range test.expected {
select {
case <-time.After(time.Second):
t.Fatalf("timed out waiting for message")
case msg := <-sendCh:
var m map[string]any
require.NoError(t, json.Unmarshal(msg, &m))
require.Equal(t, expected, m)
}
}
require.NoError(t, sender.Close(ctx))
})
}
cancel()
}

View File

@ -352,6 +352,34 @@ func (ctrl *Controller) DependencyGraph() (*controller.DependencyGraph, error) {
return ctrl.controllerRuntime.GetDependencyGraph()
}
type loggingDestination struct {
Format string
Endpoint *url.URL
ExtraTags map[string]string
}
func (a *loggingDestination) Equal(b *loggingDestination) bool {
if a.Format != b.Format {
return false
}
if a.Endpoint.String() != b.Endpoint.String() {
return false
}
if len(a.ExtraTags) != len(b.ExtraTags) {
return false
}
for k, v := range a.ExtraTags {
if vv, ok := b.ExtraTags[k]; !ok || vv != v {
return false
}
}
return true
}
func (ctrl *Controller) watchMachineConfig(ctx context.Context) {
watchCh := make(chan state.Event)
@ -365,7 +393,7 @@ func (ctrl *Controller) watchMachineConfig(ctx context.Context) {
return
}
var loggingEndpoints []*url.URL
var loggingDestinations []loggingDestination
for {
var cfg talosconfig.Config
@ -384,9 +412,9 @@ func (ctrl *Controller) watchMachineConfig(ctx context.Context) {
ctrl.updateConsoleLoggingConfig(cfg.Debug())
if cfg.Machine() == nil {
ctrl.updateLoggingConfig(ctx, nil, &loggingEndpoints)
ctrl.updateLoggingConfig(ctx, nil, &loggingDestinations)
} else {
ctrl.updateLoggingConfig(ctx, cfg.Machine().Logging().Destinations(), &loggingEndpoints)
ctrl.updateLoggingConfig(ctx, cfg.Machine().Logging().Destinations(), &loggingDestinations)
}
}
}
@ -403,23 +431,27 @@ func (ctrl *Controller) updateConsoleLoggingConfig(debug bool) {
}
}
func (ctrl *Controller) updateLoggingConfig(ctx context.Context, dests []talosconfig.LoggingDestination, prevLoggingEndpoints *[]*url.URL) {
loggingEndpoints := make([]*url.URL, len(dests))
func (ctrl *Controller) updateLoggingConfig(ctx context.Context, dests []talosconfig.LoggingDestination, prevLoggingDestinations *[]loggingDestination) {
loggingDestinations := make([]loggingDestination, len(dests))
for i, dest := range dests {
switch f := dest.Format(); f {
case constants.LoggingFormatJSONLines:
loggingEndpoints[i] = dest.Endpoint()
loggingDestinations[i] = loggingDestination{
Format: f,
Endpoint: dest.Endpoint(),
ExtraTags: dest.ExtraTags(),
}
default:
// should not be possible due to validation
panic(fmt.Sprintf("unhandled log destination format %q", f))
}
}
loggingChanged := len(*prevLoggingEndpoints) != len(loggingEndpoints)
loggingChanged := len(*prevLoggingDestinations) != len(loggingDestinations)
if !loggingChanged {
for i, u := range *prevLoggingEndpoints {
if u.String() != loggingEndpoints[i].String() {
for i, u := range *prevLoggingDestinations {
if !u.Equal(&loggingDestinations[i]) {
loggingChanged = true
break
@ -431,12 +463,12 @@ func (ctrl *Controller) updateLoggingConfig(ctx context.Context, dests []talosco
return
}
*prevLoggingEndpoints = loggingEndpoints
*prevLoggingDestinations = loggingDestinations
var prevSenders []runtime.LogSender
if len(loggingEndpoints) > 0 {
senders := xslices.Map(loggingEndpoints, runtimelogging.NewJSONLines)
if len(loggingDestinations) > 0 {
senders := xslices.Map(dests, runtimelogging.NewJSONLines)
ctrl.logger.Info("enabling JSON logging")
prevSenders = ctrl.loggingManager.SetSenders(senders)

View File

@ -445,6 +445,7 @@ type Logging interface {
// LoggingDestination describes logging destination.
type LoggingDestination interface {
Endpoint() *url.URL
ExtraTags() map[string]string
Format() string
}

View File

@ -2299,6 +2299,18 @@
"description": "Logs format.\n",
"markdownDescription": "Logs format.",
"x-intellij-html-description": "\u003cp\u003eLogs format.\u003c/p\u003e\n"
},
"extraTags": {
"patternProperties": {
".*": {
"type": "string"
}
},
"type": "object",
"title": "extraTags",
"description": "Extra tags (key-value) pairs to attach to every log message sent.\n",
"markdownDescription": "Extra tags (key-value) pairs to attach to every log message sent.",
"x-intellij-html-description": "\u003cp\u003eExtra tags (key-value) pairs to attach to every log message sent.\u003c/p\u003e\n"
}
},
"additionalProperties": false,

View File

@ -59,6 +59,11 @@ func (ld LoggingDestination) Endpoint() *url.URL {
return ld.LoggingEndpoint.URL
}
// ExtraTags implements config.LoggingDestination interface.
func (ld LoggingDestination) ExtraTags() map[string]string {
return ld.LoggingExtraTags
}
// Format implements config.LoggingDestination interface.
func (ld LoggingDestination) Format() string {
return ld.LoggingFormat

View File

@ -2392,6 +2392,9 @@ type LoggingDestination struct {
// values:
// - json_lines
LoggingFormat string `yaml:"format"`
// description: |
// Extra tags (key-value) pairs to attach to every log message sent.
LoggingExtraTags map[string]string `yaml:"extraTags,omitempty"`
}
// KernelConfig struct configures Talos Linux kernel.

View File

@ -3906,6 +3906,13 @@ func (LoggingDestination) Doc() *encoder.Doc {
"json_lines",
},
},
{
Name: "extraTags",
Type: "map[string]string",
Note: "",
Description: "Extra tags (key-value) pairs to attach to every log message sent.",
Comments: [3]string{"" /* encoder.HeadComment */, "Extra tags (key-value) pairs to attach to every log message sent." /* encoder.LineComment */, "" /* encoder.FootComment */},
},
},
}

View File

@ -1503,6 +1503,13 @@ func (in *LoggingDestination) DeepCopyInto(out *LoggingDestination) {
in, out := &in.LoggingEndpoint, &out.LoggingEndpoint
*out = (*in).DeepCopy()
}
if in.LoggingExtraTags != nil {
in, out := &in.LoggingExtraTags, &out.LoggingExtraTags
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}

View File

@ -2667,6 +2667,7 @@ endpoint: udp://127.0.0.1:12345
endpoint: tcp://1.2.3.4:12345
{{< /highlight >}}</details> | |
|`format` |string |Logs format. |`json_lines`<br /> |
|`extraTags` |map[string]string |Extra tags (key-value) pairs to attach to every log message sent. | |

View File

@ -2299,6 +2299,18 @@
"description": "Logs format.\n",
"markdownDescription": "Logs format.",
"x-intellij-html-description": "\u003cp\u003eLogs format.\u003c/p\u003e\n"
},
"extraTags": {
"patternProperties": {
".*": {
"type": "string"
}
},
"type": "object",
"title": "extraTags",
"description": "Extra tags (key-value) pairs to attach to every log message sent.\n",
"markdownDescription": "Extra tags (key-value) pairs to attach to every log message sent.",
"x-intellij-html-description": "\u003cp\u003eExtra tags (key-value) pairs to attach to every log message sent.\u003c/p\u003e\n"
}
},
"additionalProperties": false,

View File

@ -89,6 +89,20 @@ Messages are newline-separated when sent over TCP.
Over UDP messages are sent with one message per packet.
`msg`, `talos-level`, `talos-service`, and `talos-time` fields are always present; there may be additional fields.
Every message sent can be enhanced with additional fields by using the `extraTags` field in the machine configuration:
```yaml
machine:
logging:
destinations:
- endpoint: "udp://127.0.0.1:12345/"
format: "json_lines"
extraTags:
server: s03-rack07
```
The specified `extraTags` are added to every message sent to the destination verbatim.
### Kernel logs
Kernel log delivery can be enabled with the `talos.logging.kernel` kernel command line argument, which can be specified