mirror of
https://github.com/traefik/traefik.git
synced 2025-08-06 22:57:14 +02:00
Provide Log Body in OTEL access Log
This commit is contained in:
parent
c0edcc09bb
commit
5d85e6d088
@ -364,37 +364,54 @@ func (h *Handler) logTheRoundTrip(ctx context.Context, logDataTable *LogData) {
|
|||||||
totalDuration := time.Now().UTC().Sub(core[StartUTC].(time.Time))
|
totalDuration := time.Now().UTC().Sub(core[StartUTC].(time.Time))
|
||||||
core[Duration] = totalDuration
|
core[Duration] = totalDuration
|
||||||
|
|
||||||
if h.keepAccessLog(status, retryAttempts, totalDuration) {
|
if !h.keepAccessLog(status, retryAttempts, totalDuration) {
|
||||||
size := logDataTable.DownstreamResponse.size
|
return
|
||||||
core[DownstreamContentSize] = size
|
|
||||||
if original, ok := core[OriginContentSize]; ok {
|
|
||||||
o64 := original.(int64)
|
|
||||||
if size != o64 && size != 0 {
|
|
||||||
core[GzipRatio] = float64(o64) / float64(size)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
core[Overhead] = totalDuration
|
|
||||||
if origin, ok := core[OriginDuration]; ok {
|
|
||||||
core[Overhead] = totalDuration - origin.(time.Duration)
|
|
||||||
}
|
|
||||||
|
|
||||||
fields := logrus.Fields{}
|
|
||||||
|
|
||||||
for k, v := range logDataTable.Core {
|
|
||||||
if h.config.Fields.Keep(strings.ToLower(k)) {
|
|
||||||
fields[k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
h.redactHeaders(logDataTable.Request.headers, fields, "request_")
|
|
||||||
h.redactHeaders(logDataTable.OriginResponse, fields, "origin_")
|
|
||||||
h.redactHeaders(logDataTable.DownstreamResponse.headers, fields, "downstream_")
|
|
||||||
|
|
||||||
h.mu.Lock()
|
|
||||||
defer h.mu.Unlock()
|
|
||||||
h.logger.WithContext(ctx).WithFields(fields).Println()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size := logDataTable.DownstreamResponse.size
|
||||||
|
core[DownstreamContentSize] = size
|
||||||
|
if original, ok := core[OriginContentSize]; ok {
|
||||||
|
o64 := original.(int64)
|
||||||
|
if size != o64 && size != 0 {
|
||||||
|
core[GzipRatio] = float64(o64) / float64(size)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
core[Overhead] = totalDuration
|
||||||
|
if origin, ok := core[OriginDuration]; ok {
|
||||||
|
core[Overhead] = totalDuration - origin.(time.Duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
fields := logrus.Fields{}
|
||||||
|
|
||||||
|
for k, v := range logDataTable.Core {
|
||||||
|
if h.config.Fields.Keep(strings.ToLower(k)) {
|
||||||
|
fields[k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
h.redactHeaders(logDataTable.Request.headers, fields, "request_")
|
||||||
|
h.redactHeaders(logDataTable.OriginResponse, fields, "origin_")
|
||||||
|
h.redactHeaders(logDataTable.DownstreamResponse.headers, fields, "downstream_")
|
||||||
|
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
|
||||||
|
entry := h.logger.WithContext(ctx).WithFields(fields)
|
||||||
|
|
||||||
|
var message string
|
||||||
|
if h.config.OTLP != nil {
|
||||||
|
// If the logger is configured to use OpenTelemetry,
|
||||||
|
// we compute the log body with the formatter.
|
||||||
|
mBytes, err := h.logger.Formatter.Format(entry)
|
||||||
|
if err != nil {
|
||||||
|
message = fmt.Sprintf("Failed to format access log entry: %v", err)
|
||||||
|
} else {
|
||||||
|
message = string(mBytes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
entry.Println(message)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) redactHeaders(headers http.Header, fields logrus.Fields, prefix string) {
|
func (h *Handler) redactHeaders(headers http.Header, fields logrus.Fields, prefix string) {
|
||||||
|
@ -56,80 +56,121 @@ var (
|
|||||||
testStart = time.Now()
|
testStart = time.Now()
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestOTelAccessLog(t *testing.T) {
|
func TestOTelAccessLogWithBody(t *testing.T) {
|
||||||
logCh := make(chan string)
|
testCases := []struct {
|
||||||
collector := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
desc string
|
||||||
gzr, err := gzip.NewReader(r.Body)
|
format string
|
||||||
require.NoError(t, err)
|
bodyCheckFn func(*testing.T, string)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
desc: "Common format with log body",
|
||||||
|
format: CommonFormat,
|
||||||
|
bodyCheckFn: func(t *testing.T, log string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
body, err := io.ReadAll(gzr)
|
// For common format, verify the body contains the CLF formatted string
|
||||||
require.NoError(t, err)
|
assert.Regexp(t, `"body":{"stringValue":".*- /health -.*200.*"}`, log)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "JSON format with log body",
|
||||||
|
format: JSONFormat,
|
||||||
|
bodyCheckFn: func(t *testing.T, log string) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
req := plogotlp.NewExportRequest()
|
// For JSON format, verify the body contains the JSON formatted string
|
||||||
err = req.UnmarshalProto(body)
|
assert.Regexp(t, `"body":{"stringValue":".*DownstreamStatus.*:200.*"}`, log)
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
marshalledReq, err := json.Marshal(req)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
logCh <- string(marshalledReq)
|
|
||||||
}))
|
|
||||||
t.Cleanup(collector.Close)
|
|
||||||
|
|
||||||
config := &types.AccessLog{
|
|
||||||
OTLP: &types.OTelLog{
|
|
||||||
ServiceName: "test",
|
|
||||||
ResourceAttributes: map[string]string{"resource": "attribute"},
|
|
||||||
HTTP: &types.OTelHTTP{
|
|
||||||
Endpoint: collector.URL,
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
logHandler, err := NewHandler(t.Context(), config)
|
|
||||||
require.NoError(t, err)
|
|
||||||
t.Cleanup(func() {
|
|
||||||
err := logHandler.Close()
|
|
||||||
require.NoError(t, err)
|
|
||||||
})
|
|
||||||
|
|
||||||
req := &http.Request{
|
for _, test := range testCases {
|
||||||
Header: map[string][]string{},
|
t.Run(test.desc, func(t *testing.T) {
|
||||||
URL: &url.URL{
|
t.Parallel()
|
||||||
Path: testPath,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
ctx := trace.ContextWithSpanContext(t.Context(), trace.NewSpanContext(trace.SpanContextConfig{
|
|
||||||
TraceID: trace.TraceID{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
|
|
||||||
SpanID: trace.SpanID{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
|
|
||||||
}))
|
|
||||||
req = req.WithContext(ctx)
|
|
||||||
|
|
||||||
chain := alice.New()
|
logCh := make(chan string)
|
||||||
chain = chain.Append(capture.Wrap)
|
collector := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
gzr, err := gzip.NewReader(r.Body)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Injection of the observability variables in the request context.
|
body, err := io.ReadAll(gzr)
|
||||||
chain = chain.Append(func(next http.Handler) (http.Handler, error) {
|
require.NoError(t, err)
|
||||||
return observability.WithObservabilityHandler(next, observability.Observability{
|
|
||||||
AccessLogsEnabled: true,
|
|
||||||
}), nil
|
|
||||||
})
|
|
||||||
|
|
||||||
chain = chain.Append(logHandler.AliceConstructor())
|
req := plogotlp.NewExportRequest()
|
||||||
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
err = req.UnmarshalProto(body)
|
||||||
rw.WriteHeader(http.StatusOK)
|
require.NoError(t, err)
|
||||||
}))
|
|
||||||
require.NoError(t, err)
|
|
||||||
handler.ServeHTTP(httptest.NewRecorder(), req)
|
|
||||||
|
|
||||||
select {
|
marshalledReq, err := json.Marshal(req)
|
||||||
case <-time.After(5 * time.Second):
|
require.NoError(t, err)
|
||||||
t.Error("AccessLog not exported")
|
|
||||||
|
|
||||||
case log := <-logCh:
|
logCh <- string(marshalledReq)
|
||||||
assert.Regexp(t, `{"key":"resource","value":{"stringValue":"attribute"}}`, log)
|
}))
|
||||||
assert.Regexp(t, `{"key":"service.name","value":{"stringValue":"test"}}`, log)
|
t.Cleanup(collector.Close)
|
||||||
assert.Regexp(t, `{"key":"DownstreamStatus","value":{"intValue":"200"}}`, log)
|
|
||||||
assert.Regexp(t, `"traceId":"01020304050607080000000000000000","spanId":"0102030405060708"`, log)
|
config := &types.AccessLog{
|
||||||
|
Format: test.format,
|
||||||
|
OTLP: &types.OTelLog{
|
||||||
|
ServiceName: "test",
|
||||||
|
ResourceAttributes: map[string]string{"resource": "attribute"},
|
||||||
|
HTTP: &types.OTelHTTP{
|
||||||
|
Endpoint: collector.URL,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
logHandler, err := NewHandler(t.Context(), config)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
err := logHandler.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
req := &http.Request{
|
||||||
|
Header: map[string][]string{},
|
||||||
|
URL: &url.URL{
|
||||||
|
Path: "/health",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ctx := trace.ContextWithSpanContext(t.Context(), trace.NewSpanContext(trace.SpanContextConfig{
|
||||||
|
TraceID: trace.TraceID{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
|
||||||
|
SpanID: trace.SpanID{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
|
||||||
|
}))
|
||||||
|
req = req.WithContext(ctx)
|
||||||
|
|
||||||
|
chain := alice.New()
|
||||||
|
chain = chain.Append(capture.Wrap)
|
||||||
|
|
||||||
|
// Injection of the observability variables in the request context.
|
||||||
|
chain = chain.Append(func(next http.Handler) (http.Handler, error) {
|
||||||
|
return observability.WithObservabilityHandler(next, observability.Observability{
|
||||||
|
AccessLogsEnabled: true,
|
||||||
|
}), nil
|
||||||
|
})
|
||||||
|
|
||||||
|
chain = chain.Append(logHandler.AliceConstructor())
|
||||||
|
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||||
|
rw.WriteHeader(http.StatusOK)
|
||||||
|
}))
|
||||||
|
require.NoError(t, err)
|
||||||
|
handler.ServeHTTP(httptest.NewRecorder(), req)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Error("AccessLog not exported")
|
||||||
|
|
||||||
|
case log := <-logCh:
|
||||||
|
// Verify basic OTLP structure
|
||||||
|
assert.Regexp(t, `{"key":"resource","value":{"stringValue":"attribute"}}`, log)
|
||||||
|
assert.Regexp(t, `{"key":"service.name","value":{"stringValue":"test"}}`, log)
|
||||||
|
assert.Regexp(t, `{"key":"DownstreamStatus","value":{"intValue":"200"}}`, log)
|
||||||
|
assert.Regexp(t, `"traceId":"01020304050607080000000000000000","spanId":"0102030405060708"`, log)
|
||||||
|
|
||||||
|
// Most importantly, verify the log body is populated (not empty)
|
||||||
|
assert.NotRegexp(t, `"body":{"stringValue":""}`, log, "Log body should not be empty when OTLP is configured")
|
||||||
|
|
||||||
|
// Run format-specific body checks
|
||||||
|
test.bodyCheckFn(t, log)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user