mirror of
https://github.com/siderolabs/talos.git
synced 2026-05-05 12:26:21 +02:00
fix: ensure no goroutines escape in dns controller
- Remove all reliance on finalizers. - Add `Close` method to CoreDNS `Proxy` struct. - Wait for `Runner.Serve` to complete. Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
This commit is contained in:
parent
fce824e2f3
commit
dab30a8b9f
11
go.mod
11
go.mod
@ -7,7 +7,7 @@ replace (
|
||||
cloud.google.com/go => cloud.google.com/go v0.100.2
|
||||
|
||||
// forked coredns so we don't carry caddy and other stuff into the Talos
|
||||
github.com/coredns/coredns => github.com/siderolabs/coredns v1.12.50
|
||||
github.com/coredns/coredns => github.com/siderolabs/coredns v1.12.51
|
||||
|
||||
// forked ethtool introduces missing APIs
|
||||
github.com/mdlayher/ethtool => github.com/siderolabs/ethtool v0.3.0
|
||||
@ -180,6 +180,7 @@ require (
|
||||
go.etcd.io/etcd/client/pkg/v3 v3.5.18
|
||||
go.etcd.io/etcd/client/v3 v3.5.18
|
||||
go.etcd.io/etcd/etcdutl/v3 v3.5.18
|
||||
go.uber.org/goleak v1.3.0
|
||||
go.uber.org/zap v1.27.0
|
||||
go4.org/netipx v0.0.0-20231129151722-fdeea329fbba
|
||||
golang.org/x/net v0.36.0
|
||||
@ -327,7 +328,7 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
|
||||
github.com/prometheus/client_golang v1.20.5 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.60.1 // indirect
|
||||
github.com/prometheus/common v0.62.0 // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/sasha-s/go-deadlock v0.3.5 // indirect
|
||||
@ -351,9 +352,9 @@ require (
|
||||
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
|
||||
go.opentelemetry.io/otel v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.33.0 // indirect
|
||||
go.opentelemetry.io/otel v1.34.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.34.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.34.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.35.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect
|
||||
|
||||
20
go.sum
20
go.sum
@ -588,8 +588,8 @@ github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/j
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
|
||||
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
|
||||
github.com/prometheus/common v0.60.1 h1:FUas6GcOw66yB/73KC+BOZoFJmbo/1pojoILArPAaSc=
|
||||
github.com/prometheus/common v0.60.1/go.mod h1:h0LYf1R1deLSKtD4Vdg8gy4RuOvENW2J/h19V5NADQw=
|
||||
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
|
||||
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
|
||||
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
|
||||
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
|
||||
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
|
||||
@ -622,8 +622,8 @@ github.com/scaleway/scaleway-sdk-go v1.0.0-beta.32 h1:4+LP7qmsLSGbmc66m1s5dKRMBw
|
||||
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.32/go.mod h1:kzh+BSAvpoyHHdHBCDhmSWtBc1NbLMZ2lWHqnBoxFks=
|
||||
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
|
||||
github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
|
||||
github.com/siderolabs/coredns v1.12.50 h1:uOBWZErtM3pcncGj0XxHru53Xo3SND/qjEVkfHyrvks=
|
||||
github.com/siderolabs/coredns v1.12.50/go.mod h1:le+OxWZF+rYtNGuYvxuOAdlrTwJAHKcuW09SPu25w2c=
|
||||
github.com/siderolabs/coredns v1.12.51 h1:Ii3oLAXC4RALjmndKrDFoh1QX8GwukAvQe5VShnknhw=
|
||||
github.com/siderolabs/coredns v1.12.51/go.mod h1:xbR7QxgwisIlnSGXSGJ8GgyF38QHPdp4qIoviAASrvU=
|
||||
github.com/siderolabs/crypto v0.5.1 h1:aZEUTZBoP8rH+0TqQAlUgazriPh89MrXf4R+th+m6ps=
|
||||
github.com/siderolabs/crypto v0.5.1/go.mod h1:7RHC7eUKBx6RLS2lDaNXrQ83zY9iPH/aQSTxk1I4/j4=
|
||||
github.com/siderolabs/discovery-api v0.1.6 h1:/LhsF1ytqFEfWwV0UKfUgn90k9fk5+rhYMJ9yeUB2yc=
|
||||
@ -782,20 +782,20 @@ go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.5
|
||||
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0/go.mod h1:HDBUsEjOuRC0EzKZ1bSaRGZWUBAzo+MhAcUUORSr4D0=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 h1:yd02MEjBdJkG3uabWP9apV+OuWRIXGDuJEUJbOHmCFU=
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0/go.mod h1:umTcuxiv1n/s/S6/c2AT/g2CQ7u5C59sHDNmfSwgz7Q=
|
||||
go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw=
|
||||
go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I=
|
||||
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
|
||||
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0 h1:Vh5HayB/0HHfOQA7Ctx69E/Y/DcQSMPpKANYVMQ7fBA=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.33.0/go.mod h1:cpgtDBaqD/6ok/UG0jT15/uKjAY8mRA53diogHBg3UI=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0 h1:wpMfgF8E1rkrT1Z6meFh1NDtownE9Ii3n3X2GJYjsaU=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.33.0/go.mod h1:wAy0T/dUbs468uOlkT31xjvqQgEVXv58BRFWEgn5v/0=
|
||||
go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ=
|
||||
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
|
||||
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
|
||||
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
|
||||
go.opentelemetry.io/otel/sdk v1.33.0 h1:iax7M131HuAm9QkZotNHEfstof92xM+N8sr3uHXc2IM=
|
||||
go.opentelemetry.io/otel/sdk v1.33.0/go.mod h1:A1Q5oi7/9XaMlIWzPSxLRWOI8nG3FnzHJNbiENQuihM=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
|
||||
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
|
||||
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
|
||||
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
|
||||
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
||||
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
|
||||
go.opentelemetry.io/proto/otlp v1.4.0 h1:TA9WRvW6zMwP+Ssb6fLoUIuirti1gGbP28GcKG1jgeg=
|
||||
go.opentelemetry.io/proto/otlp v1.4.0/go.mod h1:PPBWZIP98o2ElSqI35IHfu7hIhSwvc5N38Jw8pXuGFY=
|
||||
|
||||
@ -17,8 +17,6 @@ golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/tools v0.30.0 h1:BgcpHewrV5AUp2G9MebG4XPFI1E2W41zU1SaqVA9vJY=
|
||||
golang.org/x/tools v0.30.0/go.mod h1:c347cR/OJfw5TI+GfX7RUPNMdDRRbjvYTS0jPyvsVtY=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/typ.v4 v4.3.1 h1:z3vGwIIn2X8GjP589+YS8YWmpNqBFEy0w7OcGWj0NZ4=
|
||||
gopkg.in/typ.v4 v4.3.1/go.mod h1:wolXe8DlewxRCjA7SOiT3zjrZ0eQJZcr8cmV6bQWJUM=
|
||||
gopkg.in/typ.v4 v4.4.0 h1:O9vTueEmZd0iA9DF+g2wXeNCeloN2TOpxu6FXKl3AqM=
|
||||
gopkg.in/typ.v4 v4.4.0/go.mod h1:wolXe8DlewxRCjA7SOiT3zjrZ0eQJZcr8cmV6bQWJUM=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/goleak"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
|
||||
@ -249,6 +250,8 @@ func (suite *DNSServer) TestResolveMembers() {
|
||||
}
|
||||
|
||||
func TestDNSServer(t *testing.T) {
|
||||
goleak.VerifyNone(t)
|
||||
|
||||
suite.Run(t, &DNSServer{
|
||||
DefaultSuite: ctest.DefaultSuite{
|
||||
Timeout: 10 * time.Second,
|
||||
@ -340,6 +343,8 @@ func (suite *DNSUpstreams) TestOrder() {
|
||||
}
|
||||
|
||||
func TestDNSUpstreams(t *testing.T) {
|
||||
goleak.VerifyNone(t)
|
||||
|
||||
suite.Run(t, &DNSUpstreams{
|
||||
DefaultSuite: ctest.DefaultSuite{
|
||||
Timeout: 10 * time.Second,
|
||||
|
||||
@ -150,15 +150,21 @@ func existingConnections(ctx context.Context, r controller.Runtime) (func(*netwo
|
||||
return
|
||||
}
|
||||
|
||||
defer func(c *network.DNSConn) {
|
||||
if c != nil {
|
||||
c.Close()
|
||||
}
|
||||
}(spec.Conn)
|
||||
|
||||
if conn, ok := existingConn[remoteAddr]; ok {
|
||||
spec.Conn = conn
|
||||
spec.Conn = conn.NewRef()
|
||||
|
||||
l.Debug("reusing existing upstream connection", zap.String("addr", remoteAddr))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
spec.Conn = network.NewDNSConn(proxy.NewProxy(remoteHost, remoteAddr, "dns"), l)
|
||||
spec.Conn = network.NewDNSConn(proxy.NewProxy(remoteHost, remoteAddr, "dns"))
|
||||
|
||||
l.Debug("created new upstream connection", zap.String("addr", remoteAddr))
|
||||
|
||||
@ -177,14 +183,20 @@ func cleanupUpstream(ctx context.Context, r controller.Runtime, touchedIDs map[r
|
||||
for val := range list.All() {
|
||||
md := val.Metadata()
|
||||
|
||||
if _, ok := touchedIDs[md.ID()]; !ok {
|
||||
if err = r.Destroy(ctx, md); err != nil {
|
||||
l.Error("error destroying upstream", zap.Error(err), zap.String("id", md.ID()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
l.Debug("destroyed dns upstream", zap.String("addr", md.ID()))
|
||||
if _, ok := touchedIDs[md.ID()]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if conn := val.TypedSpec().Value.Conn; conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
if err = r.Destroy(ctx, md); err != nil {
|
||||
l.Error("error destroying upstream", zap.Error(err), zap.String("id", md.ID()))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
l.Debug("destroyed dns upstream", zap.String("addr", md.ID()))
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,28 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
// Package xcontext provides a small utils for context package
|
||||
package xcontext
|
||||
|
||||
import "context"
|
||||
|
||||
// AfterFuncSync is like [context.AfterFunc] but it blocks until the function is executed.
|
||||
func AfterFuncSync(ctx context.Context, fn func()) func() bool {
|
||||
stopChan := make(chan struct{})
|
||||
|
||||
stop := context.AfterFunc(ctx, func() {
|
||||
defer close(stopChan)
|
||||
|
||||
fn()
|
||||
})
|
||||
|
||||
return func() bool {
|
||||
result := stop()
|
||||
if !result {
|
||||
<-stopChan
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
}
|
||||
@ -9,7 +9,6 @@ import (
|
||||
"iter"
|
||||
"net"
|
||||
"net/netip"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
@ -22,6 +21,7 @@ import (
|
||||
"github.com/siderolabs/gen/xtesting/check"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/thejerf/suture/v4"
|
||||
"go.uber.org/goleak"
|
||||
"go.uber.org/zap/zaptest"
|
||||
|
||||
"github.com/siderolabs/talos/internal/pkg/dns"
|
||||
@ -29,6 +29,8 @@ import (
|
||||
)
|
||||
|
||||
func TestDNS(t *testing.T) {
|
||||
goleak.VerifyNone(t)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
hostname string
|
||||
@ -102,6 +104,8 @@ func TestDNS(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDNSEmptyDestinations(t *testing.T) {
|
||||
goleak.VerifyNone(t)
|
||||
|
||||
stop := newManager(t)
|
||||
defer stop()
|
||||
|
||||
@ -118,42 +122,35 @@ func TestDNSEmptyDestinations(t *testing.T) {
|
||||
stop()
|
||||
}
|
||||
|
||||
func TestGC_NOGC(t *testing.T) {
|
||||
tests := map[string]bool{
|
||||
"ClearAll": false,
|
||||
"No ClearAll": true,
|
||||
func Test_ServeBackground(t *testing.T) {
|
||||
goleak.VerifyNone(t)
|
||||
|
||||
m := dns.NewManager(&testReader{}, func(e suture.Event) { t.Log("dns-runners event:", e) }, zaptest.NewLogger(t))
|
||||
|
||||
m.ServeBackground(t.Context())
|
||||
|
||||
// should not panic since ServeBackground is called with the same context
|
||||
m.ServeBackground(t.Context())
|
||||
|
||||
// should panic since ServeBackground is called with a different context
|
||||
require.Panics(t, func() { m.ServeBackground(context.TODO()) }) //nolint:usetesting
|
||||
|
||||
for _, err := range m.RunAll(slices.Values([]dns.AddressPair{
|
||||
{Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10700")},
|
||||
{Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10701")},
|
||||
}), false) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for name, f := range tests {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
m := dns.NewManager(&testReader{}, func(e suture.Event) { t.Log("dns-runners event:", e) }, zaptest.NewLogger(t))
|
||||
|
||||
m.ServeBackground(context.Background()) //nolint:usetesting
|
||||
m.ServeBackground(context.Background()) //nolint:usetesting
|
||||
require.Panics(t, func() { m.ServeBackground(context.TODO()) }) //nolint:usetesting
|
||||
|
||||
for _, err := range m.RunAll(slices.Values([]dns.AddressPair{
|
||||
{Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10700")},
|
||||
{Network: "udp", Addr: netip.MustParseAddrPort("127.0.0.1:10701")},
|
||||
}), false) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NoError(t, m.ClearAll(f))
|
||||
|
||||
m = nil
|
||||
|
||||
for range 100 {
|
||||
runtime.GC()
|
||||
}
|
||||
})
|
||||
}
|
||||
require.NoError(t, m.ClearAll(false))
|
||||
}
|
||||
|
||||
func newManager(t *testing.T, nameservers ...string) func() {
|
||||
m := dns.NewManager(&testReader{}, func(e suture.Event) {
|
||||
t.Log("dns-runners event:", e)
|
||||
}, zaptest.NewLogger(t))
|
||||
m := dns.NewManager(
|
||||
&testReader{},
|
||||
func(e suture.Event) { t.Log("dns-runners event:", e) },
|
||||
zaptest.NewLogger(t),
|
||||
)
|
||||
|
||||
m.AllowNodeResolving(true)
|
||||
|
||||
@ -167,11 +164,15 @@ func newManager(t *testing.T, nameservers ...string) func() {
|
||||
p := proxy.NewProxy(ns, net.JoinHostPort(ns, "53"), "dns")
|
||||
p.Start(500 * time.Millisecond)
|
||||
|
||||
t.Cleanup(p.Stop)
|
||||
|
||||
return p
|
||||
})
|
||||
|
||||
t.Cleanup(func() {
|
||||
for _, p := range pxs {
|
||||
p.Close() // We had to manually add this method to the coredns Proxy type.
|
||||
}
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) //nolint:usetesting
|
||||
t.Cleanup(cancel)
|
||||
|
||||
|
||||
@ -10,7 +10,6 @@ import (
|
||||
"fmt"
|
||||
"iter"
|
||||
"net/netip"
|
||||
"runtime"
|
||||
"slices"
|
||||
"strings"
|
||||
"time"
|
||||
@ -55,9 +54,6 @@ func NewManager(mr MemberReader, hook suture.EventHook, logger *zap.Logger) *Man
|
||||
runners: map[AddressPair]suture.ServiceToken{},
|
||||
}
|
||||
|
||||
// If we lost ref to the manager. Ensure finalizer is called and all upstreams are collected.
|
||||
runtime.SetFinalizer(m, (*Manager).finalize)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
@ -192,20 +188,6 @@ func (m *Manager) clearAll() iter.Seq2[AddressPair, error] {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) finalize() {
|
||||
for data, err := range m.clearAll() {
|
||||
if err != nil {
|
||||
m.logger.Error("error stopping dns runner", zap.Error(err))
|
||||
}
|
||||
|
||||
m.logger.Info(
|
||||
"dns runner stopped from finalizer!",
|
||||
zap.String("address", data.Addr.String()),
|
||||
zap.String("network", data.Network),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Done reports if superwisor finished execution.
|
||||
func (m *Manager) Done() <-chan error {
|
||||
return m.supervisorCh
|
||||
|
||||
@ -9,13 +9,11 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/siderolabs/talos/internal/app/machined/pkg/xcontext"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// RunnerOptions is a [Runner] options.
|
||||
@ -52,18 +50,17 @@ type Runner struct {
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// Serve starts the DNS server.
|
||||
// Serve starts the DNS server. Implements [suture.Service] interface.
|
||||
func (r *Runner) Serve(ctx context.Context) error {
|
||||
detach := xcontext.AfterFuncSync(ctx, r.close)
|
||||
defer func() {
|
||||
if !detach() {
|
||||
return
|
||||
}
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
r.close()
|
||||
}()
|
||||
eg.Go(r.srv.ActivateAndServe)
|
||||
|
||||
return r.srv.ActivateAndServe()
|
||||
<-ctx.Done()
|
||||
|
||||
r.close()
|
||||
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
func (r *Runner) close() {
|
||||
@ -75,34 +72,24 @@ func (r *Runner) close() {
|
||||
l = l.With(zap.String("net", "udp"), zap.String("local_addr", r.srv.PacketConn.LocalAddr().String()))
|
||||
}
|
||||
|
||||
for {
|
||||
err := r.srv.Shutdown()
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "server not started") {
|
||||
// There a possible scenario where `go func()` not yet reached `ActivateAndServe` and yielded CPU
|
||||
// time to another goroutine and then this closure reached `Shutdown`. In that case
|
||||
// `dns.Server.ActivateAndServe` will actually start after `Shutdown` and this closure will block forever
|
||||
// because `go func()` will never exit and close `done` channel.
|
||||
continue
|
||||
}
|
||||
closer := io.Closer(r.srv.Listener)
|
||||
if closer == nil {
|
||||
closer = r.srv.PacketConn
|
||||
}
|
||||
|
||||
l.Error("error shutting down dns server", zap.Error(err))
|
||||
if closer != nil {
|
||||
if err := closer.Close(); err != nil {
|
||||
l.Error("error closing dns server listener", zap.Error(err))
|
||||
} else {
|
||||
l.Debug("dns server listener closed")
|
||||
}
|
||||
}
|
||||
|
||||
closer := io.Closer(r.srv.Listener)
|
||||
if closer == nil {
|
||||
closer = r.srv.PacketConn
|
||||
}
|
||||
sCtx, sCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer sCancel()
|
||||
|
||||
if closer != nil {
|
||||
err = closer.Close()
|
||||
if err != nil && !errors.Is(err, net.ErrClosed) {
|
||||
l.Error("error closing dns server listener", zap.Error(err))
|
||||
} else {
|
||||
l.Debug("dns server listener closed")
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
err := r.srv.ShutdownContext(sCtx)
|
||||
if err != nil && !errors.Is(err, net.ErrClosed) {
|
||||
l.Error("error shutting down dns server", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,7 +37,6 @@ require (
|
||||
github.com/siderolabs/net v0.4.0
|
||||
github.com/siderolabs/protoenc v0.2.2
|
||||
github.com/stretchr/testify v1.10.0
|
||||
go.uber.org/zap v1.27.0
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb
|
||||
google.golang.org/grpc v1.70.0
|
||||
@ -73,6 +72,7 @@ require (
|
||||
github.com/sasha-s/go-deadlock v0.3.5 // indirect
|
||||
github.com/stoewer/go-strcase v1.3.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
go.uber.org/zap v1.27.0 // indirect
|
||||
golang.org/x/crypto v0.35.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c // indirect
|
||||
golang.org/x/net v0.36.0 // indirect
|
||||
|
||||
@ -5,15 +5,14 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cosi-project/runtime/pkg/resource"
|
||||
"github.com/cosi-project/runtime/pkg/resource/handle"
|
||||
"github.com/cosi-project/runtime/pkg/resource/meta"
|
||||
"github.com/cosi-project/runtime/pkg/resource/typed"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// DNSUpstreamType is type of DNSUpstream resource.
|
||||
@ -76,12 +75,13 @@ type Proxy interface {
|
||||
Addr() string
|
||||
Fails() uint32
|
||||
Healthcheck()
|
||||
Stop()
|
||||
Close()
|
||||
Start(time.Duration)
|
||||
}
|
||||
|
||||
// DNSConn is a wrapper around a Proxy.
|
||||
type DNSConn struct {
|
||||
counter atomic.Int64
|
||||
// Proxy is essentially a *proxy.Proxy interface. It's here because we don't want machinery to depend on coredns.
|
||||
// We could use a generic struct here, but without generic aliases the usage would look ugly.
|
||||
// Once generic aliases are here, redo the type above as `type DNSUpstream[P Proxy] = typed.Resource[...]`.
|
||||
@ -89,22 +89,14 @@ type DNSConn struct {
|
||||
}
|
||||
|
||||
// NewDNSConn initializes a new DNSConn.
|
||||
func NewDNSConn(proxy Proxy, l *zap.Logger) *DNSConn {
|
||||
func NewDNSConn(proxy Proxy) *DNSConn {
|
||||
proxy.Start(500 * time.Millisecond)
|
||||
|
||||
conn := &DNSConn{proxy: proxy}
|
||||
res := &DNSConn{proxy: proxy}
|
||||
|
||||
// Set the finalizer to stop the proxy when the DNSConn is garbage collected. Since the proxy already uses a finalizer
|
||||
// to stop the actual connections, this will not carry any noticeable performance overhead.
|
||||
//
|
||||
// TODO: replace with runtime.AddCleanup once https://github.com/golang/go/issues/67535 lands
|
||||
runtime.SetFinalizer(conn, func(conn *DNSConn) {
|
||||
conn.proxy.Stop()
|
||||
res.counter.Add(1)
|
||||
|
||||
l.Info("dns connection garbage collected", zap.String("addr", conn.proxy.Addr()))
|
||||
})
|
||||
|
||||
return conn
|
||||
return res
|
||||
}
|
||||
|
||||
// Addr returns the address of the DNSConn.
|
||||
@ -118,3 +110,17 @@ func (u *DNSConn) Proxy() Proxy { return u.proxy }
|
||||
|
||||
// Healthcheck kicks of a round of health checks for this DNSConn.
|
||||
func (u *DNSConn) Healthcheck() { u.proxy.Healthcheck() }
|
||||
|
||||
// Close stops the DNSConn.
|
||||
func (u *DNSConn) Close() {
|
||||
if u.counter.Add(-1) == 0 {
|
||||
u.proxy.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// NewRef returns a new reference to the DNSConn.
|
||||
func (u *DNSConn) NewRef() *DNSConn {
|
||||
u.counter.Add(1)
|
||||
|
||||
return u
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user