From fd8a64ca95fd19a2ce67949205860a5cf46b7bfb Mon Sep 17 00:00:00 2001 From: Nicolas <49951018+Kwuray@users.noreply.github.com> Date: Fri, 22 Aug 2025 11:30:09 +0200 Subject: [PATCH] Errors on receving sigterm --- pkg/server/server_entrypoint_tcp.go | 69 ++++++++++++++++++------ pkg/server/server_entrypoint_tcp_test.go | 6 +-- 2 files changed, 55 insertions(+), 20 deletions(-) diff --git a/pkg/server/server_entrypoint_tcp.go b/pkg/server/server_entrypoint_tcp.go index 0642fa7fb..eb68714d2 100644 --- a/pkg/server/server_entrypoint_tcp.go +++ b/pkg/server/server_entrypoint_tcp.go @@ -12,6 +12,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -56,15 +57,19 @@ type connState struct { type httpForwarder struct { net.Listener - connChan chan net.Conn - errChan chan error + + connChan chan net.Conn + errChan chan error + closeChan chan struct{} + closeOnce sync.Once } func newHTTPForwarder(ln net.Listener) *httpForwarder { return &httpForwarder{ - Listener: ln, - connChan: make(chan net.Conn), - errChan: make(chan error), + Listener: ln, + connChan: make(chan net.Conn), + errChan: make(chan error), + closeChan: make(chan struct{}), } } @@ -76,6 +81,8 @@ func (h *httpForwarder) ServeTCP(conn tcp.WriteCloser) { // Accept retrieves a served connection in ServeTCP. func (h *httpForwarder) Accept() (net.Conn, error) { select { + case <-h.closeChan: + return nil, errors.New("listener closed") case conn := <-h.connChan: return conn, nil case err := <-h.errChan: @@ -83,6 +90,14 @@ func (h *httpForwarder) Accept() (net.Conn, error) { } } +// Close closes the wrapped listener and unblocks Accept. +func (h *httpForwarder) Close() error { + h.closeOnce.Do(func() { + close(h.closeChan) + }) + return h.Listener.Close() +} + // TCPEntryPoints holds a map of TCPEntryPoint (the entrypoint names being the keys). type TCPEntryPoints map[string]*TCPEntryPoint @@ -162,8 +177,9 @@ type TCPEntryPoint struct { tracker *connectionTracker httpServer *httpServer httpsServer *httpServer - - http3Server *http3server + http3Server *http3server + // inShutdown reports whether the Shutdown method has been called. + inShutdown atomic.Bool } // NewTCPEntryPoint creates a new TCPEntryPoint. @@ -172,31 +188,31 @@ func NewTCPEntryPoint(ctx context.Context, name string, config *static.EntryPoin listener, err := buildListener(ctx, name, config) if err != nil { - return nil, fmt.Errorf("error preparing server: %w", err) + return nil, fmt.Errorf("building listener: %w", err) } rt, err := tcprouter.NewRouter() if err != nil { - return nil, fmt.Errorf("error preparing tcp router: %w", err) + return nil, fmt.Errorf("creating TCP router: %w", err) } reqDecorator := requestdecorator.New(hostResolverConfig) - httpServer, err := createHTTPServer(ctx, listener, config, true, reqDecorator) + httpServer, err := newHTTPServer(ctx, listener, config, true, reqDecorator) if err != nil { - return nil, fmt.Errorf("error preparing http server: %w", err) + return nil, fmt.Errorf("creating HTTP server: %w", err) } rt.SetHTTPForwarder(httpServer.Forwarder) - httpsServer, err := createHTTPServer(ctx, listener, config, false, reqDecorator) + httpsServer, err := newHTTPServer(ctx, listener, config, false, reqDecorator) if err != nil { - return nil, fmt.Errorf("error preparing https server: %w", err) + return nil, fmt.Errorf("creating HTTPS server: %w", err) } h3Server, err := newHTTP3Server(ctx, name, config, httpsServer) if err != nil { - return nil, fmt.Errorf("error preparing http3 server: %w", err) + return nil, fmt.Errorf("creating HTTP3 server: %w", err) } rt.SetHTTPSForwarder(httpsServer.Forwarder) @@ -226,6 +242,11 @@ func (e *TCPEntryPoint) Start(ctx context.Context) { for { conn, err := e.listener.Accept() + // As the Shutdown method has been called, an error is expected. + // Thus, it is not necessary to log it. + if err != nil && e.inShutdown.Load() { + return + } if err != nil { logger.Error().Err(err).Send() @@ -277,6 +298,8 @@ func (e *TCPEntryPoint) Start(ctx context.Context) { func (e *TCPEntryPoint) Shutdown(ctx context.Context) { logger := log.Ctx(ctx) + e.inShutdown.Store(true) + reqAcceptGraceTimeOut := time.Duration(e.transportConfiguration.LifeCycle.RequestAcceptGraceTimeout) if reqAcceptGraceTimeOut > 0 { logger.Info().Msgf("Waiting %s for incoming requests to cease", reqAcceptGraceTimeOut) @@ -461,6 +484,18 @@ func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoi return proxyListener, nil } +type onceCloseListener struct { + net.Listener + + once sync.Once + closeErr error +} + +func (oc *onceCloseListener) Close() error { + oc.once.Do(func() { oc.closeErr = oc.Listener.Close() }) + return oc.closeErr +} + func buildListener(ctx context.Context, name string, config *static.EntryPoint) (net.Listener, error) { var listener net.Listener var err error @@ -496,7 +531,7 @@ func buildListener(ctx context.Context, name string, config *static.EntryPoint) return nil, fmt.Errorf("error creating proxy protocol listener: %w", err) } } - return listener, nil + return &onceCloseListener{Listener: listener}, nil } func newConnectionTracker(openConnectionsGauge gokitmetrics.Gauge) *connectionTracker { @@ -592,7 +627,7 @@ type httpServer struct { Switcher *middlewares.HTTPHandlerSwitcher } -func createHTTPServer(ctx context.Context, ln net.Listener, configuration *static.EntryPoint, withH2c bool, reqDecorator *requestdecorator.RequestDecorator) (*httpServer, error) { +func newHTTPServer(ctx context.Context, ln net.Listener, configuration *static.EntryPoint, withH2c bool, reqDecorator *requestdecorator.RequestDecorator) (*httpServer, error) { if configuration.HTTP2.MaxConcurrentStreams < 0 { return nil, errors.New("max concurrent streams value must be greater than or equal to zero") } @@ -692,7 +727,7 @@ func createHTTPServer(ctx context.Context, ln net.Listener, configuration *stati go func() { err := serverHTTP.Serve(listener) if err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Ctx(ctx).Error().Err(err).Msg("Error while starting server") + log.Ctx(ctx).Error().Err(err).Msg("Error while running HTTP server") } }() return &httpServer{ diff --git a/pkg/server/server_entrypoint_tcp_test.go b/pkg/server/server_entrypoint_tcp_test.go index 392f7c0a3..8170234a7 100644 --- a/pkg/server/server_entrypoint_tcp_test.go +++ b/pkg/server/server_entrypoint_tcp_test.go @@ -511,7 +511,7 @@ func TestNormalizePath_malformedPercentEncoding(t *testing.T) { } } -// TestPathOperations tests the whole behavior of normalizePath, and sanitizePath combined through the use of the createHTTPServer func. +// TestPathOperations tests the whole behavior of normalizePath, and sanitizePath combined through the use of the newHTTPServer func. // It aims to guarantee the server entrypoint handler is secure regarding a large variety of cases that could lead to path traversal attacks. func TestPathOperations(t *testing.T) { // Create a listener for the server. @@ -525,8 +525,8 @@ func TestPathOperations(t *testing.T) { configuration := &static.EntryPoint{} configuration.SetDefaults() - // Create the HTTP server using createHTTPServer. - server, err := createHTTPServer(t.Context(), ln, configuration, false, requestdecorator.New(nil)) + // Create the HTTP server using newHTTPServer. + server, err := newHTTPServer(t.Context(), ln, configuration, false, requestdecorator.New(nil)) require.NoError(t, err) server.Switcher.UpdateHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {