From dab30a8b9fc48078a1cefca6cc6a9caaacef5a93 Mon Sep 17 00:00:00 2001 From: Dmitriy Matrenichev Date: Thu, 6 Mar 2025 20:29:06 +0300 Subject: [PATCH] fix: ensure no goroutines escape in dns controller - Remove all reliance on finalizers. - Add `Close` method to CoreDNS `Proxy` struct. - Wait for `Runner.Serve` to complete. Signed-off-by: Dmitriy Matrenichev --- go.mod | 11 +-- go.sum | 20 +++--- hack/structprotogen/go.sum | 2 - .../network/dns_resolve_cache_test.go | 5 ++ .../pkg/controllers/network/dns_upstream.go | 32 ++++++--- .../app/machined/pkg/xcontext/xcontext.go | 28 -------- internal/pkg/dns/dns_test.go | 69 ++++++++++--------- internal/pkg/dns/manager.go | 18 ----- internal/pkg/dns/runnner.go | 61 +++++++--------- pkg/machinery/go.mod | 2 +- .../resources/network/dns_upstream.go | 36 ++++++---- 11 files changed, 124 insertions(+), 160 deletions(-) delete mode 100644 internal/app/machined/pkg/xcontext/xcontext.go diff --git a/go.mod b/go.mod index 6a5706af2..8f0315062 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ replace ( cloud.google.com/go => cloud.google.com/go v0.100.2 // forked coredns so we don't carry caddy and other stuff into the Talos - github.com/coredns/coredns => github.com/siderolabs/coredns v1.12.50 + github.com/coredns/coredns => github.com/siderolabs/coredns v1.12.51 // forked ethtool introduces missing APIs github.com/mdlayher/ethtool => github.com/siderolabs/ethtool v0.3.0 @@ -180,6 +180,7 @@ require ( go.etcd.io/etcd/client/pkg/v3 v3.5.18 go.etcd.io/etcd/client/v3 v3.5.18 go.etcd.io/etcd/etcdutl/v3 v3.5.18 + go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 go4.org/netipx v0.0.0-20231129151722-fdeea329fbba golang.org/x/net v0.36.0 @@ -327,7 +328,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_golang v1.20.5 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.60.1 // indirect + github.com/prometheus/common v0.62.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/sasha-s/go-deadlock v0.3.5 // indirect @@ -351,9 +352,9 @@ require ( go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect - go.opentelemetry.io/otel v1.33.0 // indirect - go.opentelemetry.io/otel/metric v1.33.0 // indirect - go.opentelemetry.io/otel/trace v1.33.0 // indirect + go.opentelemetry.io/otel v1.34.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/otel/trace v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.35.0 // indirect golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect diff --git a/go.sum b/go.sum index b70e9a665..672bcd92f 100644 --- a/go.sum +++ b/go.sum @@ -588,8 +588,8 @@ github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/j github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= -github.com/prometheus/common v0.60.1 h1:FUas6GcOw66yB/73KC+BOZoFJmbo/1pojoILArPAaSc= -github.com/prometheus/common v0.60.1/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw= +github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io= +github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= @@ -622,8 +622,8 @@ github.com/scaleway/scaleway-sdk-go v1.0.0-beta.32 h1:4+LP7qmsLSGbmc66m1s5dKRMBw github.com/scaleway/scaleway-sdk-go v1.0.0-beta.32/go.mod h1:kzh+BSAvpoyHHdHBCDhmSWtBc1NbLMZ2lWHqnBoxFks= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= -github.com/siderolabs/coredns v1.12.50 h1:uOBWZErtM3pcncGj0XxHru53Xo3SND/qjEVkfHyrvks= -github.com/siderolabs/coredns v1.12.50/go.mod h1:le+OxWZF+rYtNGuYvxuOAdlrTwJAHKcuW09SPu25w2c= +github.com/siderolabs/coredns v1.12.51 h1:Ii3oLAXC4RALjmndKrDFoh1QX8GwukAvQe5VShnknhw= +github.com/siderolabs/coredns v1.12.51/go.mod h1:xbR7QxgwisIlnSGXSGJ8GgyF38QHPdp4qIoviAASrvU= github.com/siderolabs/crypto v0.5.1 h1:aZEUTZBoP8rH+0TqQAlUgazriPh89MrXf4R+th+m6ps= github.com/siderolabs/crypto v0.5.1/go.mod h1:7RHC7eUKBx6RLS2lDaNXrQ83zY9iPH/aQSTxk1I4/j4= github.com/siderolabs/discovery-api v0.1.6 h1:/LhsF1ytqFEfWwV0UKfUgn90k9fk5+rhYMJ9yeUB2yc= @@ -782,20 +782,20 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.5 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0/go.mod h1:HDBUsEjOuRC0EzKZ1bSaRGZWUBAzo+MhAcUUORSr4D0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q= -go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= -go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 h1:Vh5HayB/0HHfOQA7Ctx69E/Y/DcQSMPpKANYVMQ7fBA= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0/go.mod h1:cpgtDBaqD/6ok/UG0jT15/uKjAY8mRA53diogHBg3UI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 h1:wpMfgF8E1rkrT1Z6meFh1NDtownE9Ii3n3X2GJYjsaU= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0/go.mod h1:wAy0T/dUbs468uOlkT31xjvqQgEVXv58BRFWEgn5v/0= -go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= -go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM= go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM= go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU= go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ= -go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= -go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg= go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY= diff --git a/hack/structprotogen/go.sum b/hack/structprotogen/go.sum index 1edf80244..560a82b10 100644 --- a/hack/structprotogen/go.sum +++ b/hack/structprotogen/go.sum @@ -17,8 +17,6 @@ golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY= golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/typ.v4 v4.3.1 h1:z3vGwIIn2X8GjP589+YS8YWmpNqBFEy0w7OcGWj0NZ4= -gopkg.in/typ.v4 v4.3.1/go.mod h1:wolXe8DlewxRCjA7SOiT3zjrZ0eQJZcr8cmV6bQWJUM= gopkg.in/typ.v4 v4.4.0 h1:O9vTueEmZd0iA9DF+g2wXeNCeloN2TOpxu6FXKl3AqM= gopkg.in/typ.v4 v4.4.0/go.mod h1:wolXe8DlewxRCjA7SOiT3zjrZ0eQJZcr8cmV6bQWJUM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/app/machined/pkg/controllers/network/dns_resolve_cache_test.go b/internal/app/machined/pkg/controllers/network/dns_resolve_cache_test.go index b1db52bf3..8cba9375b 100644 --- a/internal/app/machined/pkg/controllers/network/dns_resolve_cache_test.go +++ b/internal/app/machined/pkg/controllers/network/dns_resolve_cache_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.uber.org/goleak" "go.uber.org/zap/zaptest" "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest" @@ -249,6 +250,8 @@ func (suite *DNSServer) TestResolveMembers() { } func TestDNSServer(t *testing.T) { + goleak.VerifyNone(t) + suite.Run(t, &DNSServer{ DefaultSuite: ctest.DefaultSuite{ Timeout: 10 * time.Second, @@ -340,6 +343,8 @@ func (suite *DNSUpstreams) TestOrder() { } func TestDNSUpstreams(t *testing.T) { + goleak.VerifyNone(t) + suite.Run(t, &DNSUpstreams{ DefaultSuite: ctest.DefaultSuite{ Timeout: 10 * time.Second, diff --git a/internal/app/machined/pkg/controllers/network/dns_upstream.go b/internal/app/machined/pkg/controllers/network/dns_upstream.go index 5f1bc8509..e992a6f9c 100644 --- a/internal/app/machined/pkg/controllers/network/dns_upstream.go +++ b/internal/app/machined/pkg/controllers/network/dns_upstream.go @@ -150,15 +150,21 @@ func existingConnections(ctx context.Context, r controller.Runtime) (func(*netwo return } + defer func(c *network.DNSConn) { + if c != nil { + c.Close() + } + }(spec.Conn) + if conn, ok := existingConn[remoteAddr]; ok { - spec.Conn = conn + spec.Conn = conn.NewRef() l.Debug("reusing existing upstream connection", zap.String("addr", remoteAddr)) return } - spec.Conn = network.NewDNSConn(proxy.NewProxy(remoteHost, remoteAddr, "dns"), l) + spec.Conn = network.NewDNSConn(proxy.NewProxy(remoteHost, remoteAddr, "dns")) l.Debug("created new upstream connection", zap.String("addr", remoteAddr)) @@ -177,14 +183,20 @@ func cleanupUpstream(ctx context.Context, r controller.Runtime, touchedIDs map[r for val := range list.All() { md := val.Metadata() - if _, ok := touchedIDs[md.ID()]; !ok { - if err = r.Destroy(ctx, md); err != nil { - l.Error("error destroying upstream", zap.Error(err), zap.String("id", md.ID())) - - return - } - - l.Debug("destroyed dns upstream", zap.String("addr", md.ID())) + if _, ok := touchedIDs[md.ID()]; ok { + continue } + + if conn := val.TypedSpec().Value.Conn; conn != nil { + conn.Close() + } + + if err = r.Destroy(ctx, md); err != nil { + l.Error("error destroying upstream", zap.Error(err), zap.String("id", md.ID())) + + return + } + + l.Debug("destroyed dns upstream", zap.String("addr", md.ID())) } } diff --git a/internal/app/machined/pkg/xcontext/xcontext.go b/internal/app/machined/pkg/xcontext/xcontext.go deleted file mode 100644 index 32dca45de..000000000 --- a/internal/app/machined/pkg/xcontext/xcontext.go +++ /dev/null @@ -1,28 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -// Package xcontext provides a small utils for context package -package xcontext - -import "context" - -// AfterFuncSync is like [context.AfterFunc] but it blocks until the function is executed. -func AfterFuncSync(ctx context.Context, fn func()) func() bool { - stopChan := make(chan struct{}) - - stop := context.AfterFunc(ctx, func() { - defer close(stopChan) - - fn() - }) - - return func() bool { - result := stop() - if !result { - <-stopChan - } - - return result - } -} diff --git a/internal/pkg/dns/dns_test.go b/internal/pkg/dns/dns_test.go index 238b705ca..126c77960 100644 --- a/internal/pkg/dns/dns_test.go +++ b/internal/pkg/dns/dns_test.go @@ -9,7 +9,6 @@ import ( "iter" "net" "net/netip" - "runtime" "slices" "strings" "testing" @@ -22,6 +21,7 @@ import ( "github.com/siderolabs/gen/xtesting/check" "github.com/stretchr/testify/require" "github.com/thejerf/suture/v4" + "go.uber.org/goleak" "go.uber.org/zap/zaptest" "github.com/siderolabs/talos/internal/pkg/dns" @@ -29,6 +29,8 @@ import ( ) func TestDNS(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { name string hostname string @@ -102,6 +104,8 @@ func TestDNS(t *testing.T) { } func TestDNSEmptyDestinations(t *testing.T) { + goleak.VerifyNone(t) + stop := newManager(t) defer stop() @@ -118,42 +122,35 @@ func TestDNSEmptyDestinations(t *testing.T) { stop() } -func TestGC_NOGC(t *testing.T) { - tests := map[string]bool{ - "ClearAll": false, - "No ClearAll": true, +func Test_ServeBackground(t *testing.T) { + goleak.VerifyNone(t) + + m := dns.NewManager(&testReader{}, func(e suture.Event) { t.Log("dns-runners event:", e) }, zaptest.NewLogger(t)) + + m.ServeBackground(t.Context()) + + // should not panic since ServeBackground is called with the same context + m.ServeBackground(t.Context()) + + // should panic since ServeBackground is called with a different context + require.Panics(t, func() { m.ServeBackground(context.TODO()) }) //nolint:usetesting + + for _, err := range m.RunAll(slices.Values([]dns.AddressPair{ + {Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10700")}, + {Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10701")}, + }), false) { + require.NoError(t, err) } - for name, f := range tests { - t.Run(name, func(t *testing.T) { - m := dns.NewManager(&testReader{}, func(e suture.Event) { t.Log("dns-runners event:", e) }, zaptest.NewLogger(t)) - - m.ServeBackground(context.Background()) //nolint:usetesting - m.ServeBackground(context.Background()) //nolint:usetesting - require.Panics(t, func() { m.ServeBackground(context.TODO()) }) //nolint:usetesting - - for _, err := range m.RunAll(slices.Values([]dns.AddressPair{ - {Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10700")}, - {Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10701")}, - }), false) { - require.NoError(t, err) - } - - require.NoError(t, m.ClearAll(f)) - - m = nil - - for range 100 { - runtime.GC() - } - }) - } + require.NoError(t, m.ClearAll(false)) } func newManager(t *testing.T, nameservers ...string) func() { - m := dns.NewManager(&testReader{}, func(e suture.Event) { - t.Log("dns-runners event:", e) - }, zaptest.NewLogger(t)) + m := dns.NewManager( + &testReader{}, + func(e suture.Event) { t.Log("dns-runners event:", e) }, + zaptest.NewLogger(t), + ) m.AllowNodeResolving(true) @@ -167,11 +164,15 @@ func newManager(t *testing.T, nameservers ...string) func() { p := proxy.NewProxy(ns, net.JoinHostPort(ns, "53"), "dns") p.Start(500 * time.Millisecond) - t.Cleanup(p.Stop) - return p }) + t.Cleanup(func() { + for _, p := range pxs { + p.Close() // We had to manually add this method to the coredns Proxy type. + } + }) + ctx, cancel := context.WithCancel(context.Background()) //nolint:usetesting t.Cleanup(cancel) diff --git a/internal/pkg/dns/manager.go b/internal/pkg/dns/manager.go index ff7d76055..4978263ad 100644 --- a/internal/pkg/dns/manager.go +++ b/internal/pkg/dns/manager.go @@ -10,7 +10,6 @@ import ( "fmt" "iter" "net/netip" - "runtime" "slices" "strings" "time" @@ -55,9 +54,6 @@ func NewManager(mr MemberReader, hook suture.EventHook, logger *zap.Logger) *Man runners: map[AddressPair]suture.ServiceToken{}, } - // If we lost ref to the manager. Ensure finalizer is called and all upstreams are collected. - runtime.SetFinalizer(m, (*Manager).finalize) - return m } @@ -192,20 +188,6 @@ func (m *Manager) clearAll() iter.Seq2[AddressPair, error] { } } -func (m *Manager) finalize() { - for data, err := range m.clearAll() { - if err != nil { - m.logger.Error("error stopping dns runner", zap.Error(err)) - } - - m.logger.Info( - "dns runner stopped from finalizer!", - zap.String("address", data.Addr.String()), - zap.String("network", data.Network), - ) - } -} - // Done reports if superwisor finished execution. func (m *Manager) Done() <-chan error { return m.supervisorCh diff --git a/internal/pkg/dns/runnner.go b/internal/pkg/dns/runnner.go index f0725de97..ca788bb84 100644 --- a/internal/pkg/dns/runnner.go +++ b/internal/pkg/dns/runnner.go @@ -9,13 +9,11 @@ import ( "errors" "io" "net" - "strings" "time" "github.com/miekg/dns" "go.uber.org/zap" - - "github.com/siderolabs/talos/internal/app/machined/pkg/xcontext" + "golang.org/x/sync/errgroup" ) // RunnerOptions is a [Runner] options. @@ -52,18 +50,17 @@ type Runner struct { logger *zap.Logger } -// Serve starts the DNS server. +// Serve starts the DNS server. Implements [suture.Service] interface. func (r *Runner) Serve(ctx context.Context) error { - detach := xcontext.AfterFuncSync(ctx, r.close) - defer func() { - if !detach() { - return - } + eg, ctx := errgroup.WithContext(ctx) - r.close() - }() + eg.Go(r.srv.ActivateAndServe) - return r.srv.ActivateAndServe() + <-ctx.Done() + + r.close() + + return eg.Wait() } func (r *Runner) close() { @@ -75,34 +72,24 @@ func (r *Runner) close() { l = l.With(zap.String("net", "udp"), zap.String("local_addr", r.srv.PacketConn.LocalAddr().String())) } - for { - err := r.srv.Shutdown() - if err != nil { - if strings.Contains(err.Error(), "server not started") { - // There a possible scenario where `go func()` not yet reached `ActivateAndServe` and yielded CPU - // time to another goroutine and then this closure reached `Shutdown`. In that case - // `dns.Server.ActivateAndServe` will actually start after `Shutdown` and this closure will block forever - // because `go func()` will never exit and close `done` channel. - continue - } + closer := io.Closer(r.srv.Listener) + if closer == nil { + closer = r.srv.PacketConn + } - l.Error("error shutting down dns server", zap.Error(err)) + if closer != nil { + if err := closer.Close(); err != nil { + l.Error("error closing dns server listener", zap.Error(err)) + } else { + l.Debug("dns server listener closed") } + } - closer := io.Closer(r.srv.Listener) - if closer == nil { - closer = r.srv.PacketConn - } + sCtx, sCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer sCancel() - if closer != nil { - err = closer.Close() - if err != nil && !errors.Is(err, net.ErrClosed) { - l.Error("error closing dns server listener", zap.Error(err)) - } else { - l.Debug("dns server listener closed") - } - } - - break + err := r.srv.ShutdownContext(sCtx) + if err != nil && !errors.Is(err, net.ErrClosed) { + l.Error("error shutting down dns server", zap.Error(err)) } } diff --git a/pkg/machinery/go.mod b/pkg/machinery/go.mod index d29bccea8..ba44df197 100644 --- a/pkg/machinery/go.mod +++ b/pkg/machinery/go.mod @@ -37,7 +37,6 @@ require ( github.com/siderolabs/net v0.4.0 github.com/siderolabs/protoenc v0.2.2 github.com/stretchr/testify v1.10.0 - go.uber.org/zap v1.27.0 google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb google.golang.org/grpc v1.70.0 @@ -73,6 +72,7 @@ require ( github.com/sasha-s/go-deadlock v0.3.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.35.0 // indirect golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect golang.org/x/net v0.36.0 // indirect diff --git a/pkg/machinery/resources/network/dns_upstream.go b/pkg/machinery/resources/network/dns_upstream.go index 9cc16e471..9ee12ca42 100644 --- a/pkg/machinery/resources/network/dns_upstream.go +++ b/pkg/machinery/resources/network/dns_upstream.go @@ -5,15 +5,14 @@ package network import ( - "runtime" "strconv" + "sync/atomic" "time" "github.com/cosi-project/runtime/pkg/resource" "github.com/cosi-project/runtime/pkg/resource/handle" "github.com/cosi-project/runtime/pkg/resource/meta" "github.com/cosi-project/runtime/pkg/resource/typed" - "go.uber.org/zap" ) // DNSUpstreamType is type of DNSUpstream resource. @@ -76,12 +75,13 @@ type Proxy interface { Addr() string Fails() uint32 Healthcheck() - Stop() + Close() Start(time.Duration) } // DNSConn is a wrapper around a Proxy. type DNSConn struct { + counter atomic.Int64 // Proxy is essentially a *proxy.Proxy interface. It's here because we don't want machinery to depend on coredns. // We could use a generic struct here, but without generic aliases the usage would look ugly. // Once generic aliases are here, redo the type above as `type DNSUpstream[P Proxy] = typed.Resource[...]`. @@ -89,22 +89,14 @@ type DNSConn struct { } // NewDNSConn initializes a new DNSConn. -func NewDNSConn(proxy Proxy, l *zap.Logger) *DNSConn { +func NewDNSConn(proxy Proxy) *DNSConn { proxy.Start(500 * time.Millisecond) - conn := &DNSConn{proxy: proxy} + res := &DNSConn{proxy: proxy} - // Set the finalizer to stop the proxy when the DNSConn is garbage collected. Since the proxy already uses a finalizer - // to stop the actual connections, this will not carry any noticeable performance overhead. - // - // TODO: replace with runtime.AddCleanup once https://github.com/golang/go/issues/67535 lands - runtime.SetFinalizer(conn, func(conn *DNSConn) { - conn.proxy.Stop() + res.counter.Add(1) - l.Info("dns connection garbage collected", zap.String("addr", conn.proxy.Addr())) - }) - - return conn + return res } // Addr returns the address of the DNSConn. @@ -118,3 +110,17 @@ func (u *DNSConn) Proxy() Proxy { return u.proxy } // Healthcheck kicks of a round of health checks for this DNSConn. func (u *DNSConn) Healthcheck() { u.proxy.Healthcheck() } + +// Close stops the DNSConn. +func (u *DNSConn) Close() { + if u.counter.Add(-1) == 0 { + u.proxy.Close() + } +} + +// NewRef returns a new reference to the DNSConn. +func (u *DNSConn) NewRef() *DNSConn { + u.counter.Add(1) + + return u +}