feat: handle all goroutine panics gracefully

Convert goroutine panics to errors or error logs.

Disallow usage of `golang.org/x/sync/errgroup` package in the backend by `depguard` linter. This linter configuration depends on: https://github.com/siderolabs/kres/pull/417

Rekres the project to include the feature (also bump Go to 1.22.4), but revert `PROTOBUF_GO_VERSION` and `GRPC_GATEWAY_VERSION` manually to not break the frontend.

Disallowing the named `go` statement was not possible at the moment using existing linters, raised an issue in `forbidigo` for it: https://github.com/ashanbrown/forbidigo/issues/47

Closes siderolabs/omni#373.

Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
This commit is contained in:
Utku Ozdemir 2024-06-19 23:52:52 +02:00
parent c565666113
commit 6dcfd4c979
No known key found for this signature in database
GPG Key ID: DBD13117B0A14E93
35 changed files with 361 additions and 94 deletions

View File

@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-06-03T11:20:02Z by kres f292767.
# Generated on 2024-06-19T21:17:41Z by kres 4c9f215.
# options for analysis running
run:
@ -103,6 +103,17 @@ linters-settings:
deny:
- pkg: io/ioutil
desc: "replaced by io and os packages since Go 1.16: https://tip.golang.org/doc/go1.16#ioutil"
prevent_sync_errgroup:
deny:
- desc: Use github.com/siderolabs/omni/client/pkg/panichandler.ErrGroup instead
pkg: golang.org/x/sync/errgroup
files:
- $all
- '!$test'
- '!**/integration-test/**'
- '!**/omnictl/**'
- '!**/panichandler/**'
list-mode: lax
linters:
enable-all: true

View File

@ -362,6 +362,21 @@ spec:
subdirectory: v1alpha1
genGateway: true
---
kind: golang.GolangciLint
spec:
depguardExtraRules:
prevent_sync_errgroup:
list-mode: lax
files:
- $all
- "!$test"
- "!**/integration-test/**"
- "!**/omnictl/**"
- "!**/panichandler/**"
deny:
- pkg: golang.org/x/sync/errgroup
desc: Use github.com/siderolabs/omni/client/pkg/panichandler.ErrGroup instead
---
kind: js.Build
spec:
licenseText: |

View File

@ -1,8 +1,8 @@
# syntax = docker/dockerfile-upstream:1.7.1-labs
# syntax = docker/dockerfile-upstream:1.8.0-labs
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-06-06T12:40:29Z by kres 14c10c9-dirty.
# Generated on 2024-06-19T20:43:57Z by kres 4c9f215.
ARG JS_TOOLCHAIN
ARG TOOLCHAIN
@ -16,11 +16,11 @@ FROM --platform=${BUILDPLATFORM} ${JS_TOOLCHAIN} AS js-toolchain
RUN apk --update --no-cache add bash curl protoc protobuf-dev go
COPY ./go.mod .
COPY ./go.sum .
ENV GOPATH /go
ENV PATH ${PATH}:/usr/local/go/bin
ENV GOPATH=/go
ENV PATH=${PATH}:/usr/local/go/bin
# runs markdownlint
FROM docker.io/oven/bun:1.1.12-alpine AS lint-markdown
FROM docker.io/oven/bun:1.1.13-alpine AS lint-markdown
WORKDIR /src
RUN bun i markdownlint-cli@0.41.0 sentences-per-line@0.2.1
COPY .markdownlint.json .
@ -96,14 +96,14 @@ COPY ./frontend/postcss.config.js ./postcss.config.js
# build tools
FROM --platform=${BUILDPLATFORM} toolchain AS tools
ENV GO111MODULE on
ENV GO111MODULE=on
ARG CGO_ENABLED
ENV CGO_ENABLED ${CGO_ENABLED}
ENV CGO_ENABLED=${CGO_ENABLED}
ARG GOTOOLCHAIN
ENV GOTOOLCHAIN ${GOTOOLCHAIN}
ENV GOTOOLCHAIN=${GOTOOLCHAIN}
ARG GOEXPERIMENT
ENV GOEXPERIMENT ${GOEXPERIMENT}
ENV GOPATH /go
ENV GOEXPERIMENT=${GOEXPERIMENT}
ENV GOPATH=/go
ARG GOIMPORTS_VERSION
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/go/pkg go install golang.org/x/tools/cmd/goimports@v${GOIMPORTS_VERSION}
RUN mv /go/bin/goimports /bin
@ -259,7 +259,7 @@ RUN FILES="$(gofumpt -l client)" && test -z "${FILES}" || (echo -e "Source code
FROM base AS lint-golangci-lint
WORKDIR /src
COPY .golangci.yml .
ENV GOGC 50
ENV GOGC=50
RUN golangci-lint config verify --config .golangci.yml
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/.cache/golangci-lint --mount=type=cache,target=/go/pkg golangci-lint run --config .golangci.yml
@ -267,7 +267,7 @@ RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/r
FROM base AS lint-golangci-lint-client
WORKDIR /src/client
COPY client/.golangci.yml .
ENV GOGC 50
ENV GOGC=50
RUN golangci-lint config verify --config .golangci.yml
RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/root/.cache/golangci-lint --mount=type=cache,target=/go/pkg golangci-lint run --config .golangci.yml
@ -496,6 +496,6 @@ COPY --from=omni omni-linux-${TARGETARCH} /omni
COPY --from=image-fhs / /
COPY --from=image-ca-certificates / /
COPY --from=omnictl-all / /omnictl/
LABEL org.opencontainers.image.source https://github.com/siderolabs/omni
LABEL org.opencontainers.image.source=https://github.com/siderolabs/omni
ENTRYPOINT ["/omni"]

View File

@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-06-06T10:06:36Z by kres dc18ad9-dirty.
# Generated on 2024-06-19T20:43:57Z by kres 4c9f215.
# common variables
@ -20,15 +20,15 @@ REGISTRY_AND_USERNAME ?= $(REGISTRY)/$(USERNAME)
PROTOBUF_GRPC_GATEWAY_TS_VERSION ?= 1.2.1
TESTPKGS ?= ./...
JS_BUILD_ARGS ?=
PROTOBUF_GO_VERSION ?= 1.34.1
PROTOBUF_GO_VERSION ?= 1.34.1 # reverted manually to 1.34.1
GRPC_GO_VERSION ?= 1.4.0
GRPC_GATEWAY_VERSION ?= 2.19.1
GRPC_GATEWAY_VERSION ?= 2.19.1 # reverted manually to 2.19.1
VTPROTOBUF_VERSION ?= 0.6.0
GOIMPORTS_VERSION ?= 0.21.0
GOIMPORTS_VERSION ?= 0.22.0
DEEPCOPY_VERSION ?= v0.5.6
GOLANGCILINT_VERSION ?= v1.59.0
GOLANGCILINT_VERSION ?= v1.59.1
GOFUMPT_VERSION ?= v0.6.0
GO_VERSION ?= 1.22.3
GO_VERSION ?= 1.22.4
GO_BUILDFLAGS ?=
GO_LDFLAGS ?=
CGO_ENABLED ?= 0
@ -72,7 +72,7 @@ COMMON_ARGS += --build-arg=DEEPCOPY_VERSION="$(DEEPCOPY_VERSION)"
COMMON_ARGS += --build-arg=GOLANGCILINT_VERSION="$(GOLANGCILINT_VERSION)"
COMMON_ARGS += --build-arg=GOFUMPT_VERSION="$(GOFUMPT_VERSION)"
COMMON_ARGS += --build-arg=TESTPKGS="$(TESTPKGS)"
JS_TOOLCHAIN ?= docker.io/oven/bun:1.1.12-alpine
JS_TOOLCHAIN ?= docker.io/oven/bun:1.1.13-alpine
TOOLCHAIN ?= docker.io/golang:1.22-alpine
# extra variables

View File

@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-06-03T12:34:17Z by kres f292767.
# Generated on 2024-06-19T21:17:41Z by kres 4c9f215.
# options for analysis running
run:
@ -103,6 +103,17 @@ linters-settings:
deny:
- pkg: io/ioutil
desc: "replaced by io and os packages since Go 1.16: https://tip.golang.org/doc/go1.16#ioutil"
prevent_sync_errgroup:
deny:
- desc: Use github.com/siderolabs/omni/client/pkg/panichandler.ErrGroup instead
pkg: golang.org/x/sync/errgroup
files:
- $all
- '!$test'
- '!**/integration-test/**'
- '!**/omnictl/**'
- '!**/panichandler/**'
list-mode: lax
linters:
enable-all: true

View File

@ -0,0 +1,86 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package panichandler provides a panic handling errgroup.
package panichandler
import (
"context"
"errors"
"fmt"
"log"
"runtime/debug"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
const goroutinePanicked = "goroutine panicked"
// ErrPanic is the error returned when a task panics.
var ErrPanic = errors.New(goroutinePanicked)
// ErrGroup wraps golang.org/x/sync/errgroup.Group to handle panics by turning them into errors.
// It MUST be created using NewErrGroup.
type ErrGroup struct {
eg *errgroup.Group
}
// NewErrGroup creates a new ErrGroup.
func NewErrGroup() *ErrGroup {
return &ErrGroup{eg: &errgroup.Group{}}
}
// ErrGroupWithContext creates a new ErrGroup with the given context.
func ErrGroupWithContext(ctx context.Context) (*ErrGroup, context.Context) {
eg, ctx := errgroup.WithContext(ctx)
return &ErrGroup{eg: eg}, ctx
}
// Go runs the given function in a goroutine, handling panics by turning them into errors.
func (eg *ErrGroup) Go(f func() error) {
eg.eg.Go(func() (err error) {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
err = errors.Join(err, fmt.Errorf("%w: %s\n%s", ErrPanic, r, string(stack)))
}
}()
return f()
})
}
// SetLimit sets the maximum number of goroutines that can run concurrently.
func (eg *ErrGroup) SetLimit(n int) {
eg.eg.SetLimit(n)
}
// Wait waits for all goroutines to finish and returns the first error that occurred.
func (eg *ErrGroup) Wait() error {
return eg.eg.Wait()
}
// Go runs the given function in a goroutine, handling panics by logging them.
//
// This function is a panic-handling wrapper for the "go" keyword.
func Go(f func(), logger *zap.Logger) {
go func() {
defer func() {
if r := recover(); r != nil {
stack := debug.Stack()
if logger != nil {
logger.DPanic(goroutinePanicked, zap.Any("panic", r), zap.String("stack", string(stack)))
} else {
log.Printf("[fallback logger] %s: %s\n%s", goroutinePanicked, r, string(stack))
}
}
}()
f()
}()
}

View File

@ -0,0 +1,113 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package panichandler_test
import (
"errors"
"runtime"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"
"github.com/siderolabs/omni/client/pkg/panichandler"
)
func TestGoNoPanic(t *testing.T) {
logObserverCore, observedLogs := observer.New(zapcore.InfoLevel)
logger := zap.New(logObserverCore)
var wg sync.WaitGroup
wg.Add(1)
panichandler.Go(func() {
wg.Done()
}, logger)
wg.Wait()
assert.Empty(t, observedLogs.All())
}
func TestGoLogPanic(t *testing.T) {
currentFile, currentFunc := trace()
logObserverCore, observedLogs := observer.New(zapcore.InfoLevel)
logger := zap.New(logObserverCore)
panichandler.Go(func() {
panic("test")
}, logger)
require.EventuallyWithT(t, func(collect *assert.CollectT) {
if assert.Equal(collect, 1, observedLogs.Len()) {
loggedEntry := observedLogs.All()[0]
stack := loggedEntry.ContextMap()["stack"]
t.Logf("log msg: %s stack: %s", loggedEntry.Message, stack)
assert.Equal(collect, zapcore.DPanicLevel, loggedEntry.Level)
assert.Contains(collect, loggedEntry.Message, panichandler.ErrPanic.Error())
// assert stack trace
assert.Contains(collect, stack, currentFunc)
assert.Contains(collect, stack, currentFile)
}
}, 1*time.Second, 10*time.Millisecond)
}
func TestRunErrFErrUnchanged(t *testing.T) {
expectedErr := errors.New("test error")
eg := panichandler.NewErrGroup()
eg.Go(func() error {
return expectedErr
})
err := eg.Wait()
assert.Equal(t, expectedErr, err)
}
func TestRunErrFHandlePanic(t *testing.T) {
currentFile, currentFunc := trace()
eg := panichandler.NewErrGroup()
eg.Go(func() error {
return nil // no err, all good
})
eg.Go(func() error {
panic("test")
})
err := eg.Wait()
t.Logf("error: %v", err)
assert.ErrorIs(t, err, panichandler.ErrPanic)
// assert stack trace
assert.ErrorContains(t, err, currentFunc)
assert.ErrorContains(t, err, currentFile)
}
// trace returns the file and function name of the caller.
func trace() (file, function string) {
pc := make([]uintptr, 15)
n := runtime.Callers(2, pc)
frames := runtime.CallersFrames(pc[:n])
frame, _ := frames.Next()
return frame.File, frame.Function
}

View File

@ -29,6 +29,7 @@ import (
"github.com/siderolabs/omni/client/pkg/constants"
authres "github.com/siderolabs/omni/client/pkg/omni/resources/auth"
omnires "github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend"
"github.com/siderolabs/omni/internal/backend/discovery"
"github.com/siderolabs/omni/internal/backend/dns"
@ -122,15 +123,17 @@ var rootCmd = &cobra.Command{
defer stop()
// do not use signal.NotifyContext as it doesn't support any ways to log the received signal
go func() {
panichandler.Go(func() {
s := <-signals
logger.Warn("signal received, stopping Omni", zap.String("signal", s.String()))
stop()
}()
}, logger)
go runDebugServer(ctx, logger)
panichandler.Go(func() {
runDebugServer(ctx, logger)
}, logger)
// this global context propagates into all controllers and any other background activities
ctx = actor.MarkContextAsInternalActor(ctx)

2
go.mod
View File

@ -1,6 +1,6 @@
module github.com/siderolabs/omni
go 1.22.3
go 1.22.4
replace (
// use nested module

View File

@ -46,6 +46,7 @@ import (
authres "github.com/siderolabs/omni/client/pkg/omni/resources/auth"
omnires "github.com/siderolabs/omni/client/pkg/omni/resources/omni"
ctlcfg "github.com/siderolabs/omni/client/pkg/omnictl/config"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/dns"
"github.com/siderolabs/omni/internal/backend/grpc/router"
"github.com/siderolabs/omni/internal/backend/imagefactory"
@ -239,11 +240,11 @@ func (s *managementServer) MachineLogs(request *management.MachineLogsRequest, r
defer cancel()
go func() {
panichandler.Go(func() {
// connection closed, stop reading
<-response.Context().Done()
cancel()
}()
}, s.logger)
for {
line, err := logReader.ReadLine()
@ -813,9 +814,9 @@ func (s *managementServer) KubernetesSyncManifests(req *management.KubernetesSyn
errCh := make(chan error, 1)
synCh := make(chan manifests.SyncResult)
go func() {
panichandler.Go(func() {
errCh <- manifests.Sync(ctx, bootstrapManifests, cfg, req.DryRun, synCh)
}()
}, s.logger)
var updatedManifests []manifests.Manifest
@ -857,9 +858,9 @@ syncLoop:
rolloutCh := make(chan manifests.RolloutProgress)
go func() {
panichandler.Go(func() {
errCh <- manifests.WaitForRollout(ctx, cfg, updatedManifests, rolloutCh)
}()
}, s.logger)
rolloutLoop:
for {

View File

@ -16,7 +16,6 @@ import (
"github.com/cosi-project/runtime/pkg/state"
gateway "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/siderolabs/go-api-signature/pkg/message"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@ -25,6 +24,7 @@ import (
"github.com/siderolabs/omni/client/api/common"
"github.com/siderolabs/omni/client/api/omni/resources"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/grpc/router"
"github.com/siderolabs/omni/internal/backend/runtime"
)
@ -172,7 +172,7 @@ func (s *ResourceServer) Watch(in *resources.WatchRequest, serv resources.Resour
}
events := make(chan runtime.WatchResponse)
eg, ctx := errgroup.WithContext(ctx)
eg, ctx := panichandler.ErrGroupWithContext(ctx)
if in.TailEvents != 0 {
opts = append(opts, runtime.WithTailEvents(int(in.TailEvents)))

View File

@ -23,13 +23,13 @@ import (
"github.com/siderolabs/go-talos-support/support/bundle"
"github.com/siderolabs/go-talos-support/support/collectors"
"github.com/siderolabs/talos/pkg/machinery/client"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v3"
"k8s.io/client-go/kubernetes"
"github.com/siderolabs/omni/client/api/omni/management"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/omni/resources/siderolink"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/runtime"
kubernetesruntime "github.com/siderolabs/omni/internal/backend/runtime/kubernetes"
"github.com/siderolabs/omni/internal/pkg/auth"
@ -127,7 +127,7 @@ func (s *managementServer) GetSupportBundle(req *management.GetSupportBundleRequ
cols = append(cols, talosCollectors...)
var eg errgroup.Group
eg := panichandler.NewErrGroup()
eg.Go(func() error {
for p := range progress {

View File

@ -25,7 +25,6 @@ import (
"github.com/hashicorp/golang-lru/v2/expirable"
"github.com/prometheus/client_golang/prometheus"
"github.com/siderolabs/gen/channel"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@ -40,6 +39,7 @@ import (
"github.com/siderolabs/omni/client/api/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/k8s"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
pkgruntime "github.com/siderolabs/omni/client/pkg/runtime"
"github.com/siderolabs/omni/internal/backend/oidc/external"
"github.com/siderolabs/omni/internal/backend/runtime"
@ -473,7 +473,7 @@ func (w *Watch) run(ctx context.Context, client *Client) error {
errCh := make(chan error, 1)
eg, ctx := errgroup.WithContext(ctx)
eg, ctx := panichandler.ErrGroupWithContext(ctx)
if err := informer.Informer().SetWatchErrorHandler(func(_ *toolscache.Reflector, e error) {
channel.SendWithContext(ctx, errCh, e)

View File

@ -21,12 +21,12 @@ import (
"github.com/siderolabs/gen/containers"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/etcdbackup"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/etcdbackup/store"
)
@ -160,7 +160,7 @@ func (ctrl *EtcdBackupController) run(ctx context.Context, r controller.Runtime,
return nil
}
var eg errgroup.Group
eg := panichandler.NewErrGroup()
eg.SetLimit(ctrl.parallel)

View File

@ -11,8 +11,7 @@ import (
"fmt"
"io"
"golang.org/x/sync/errgroup"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/etcdbackup"
)
@ -32,7 +31,7 @@ func (c *Store) Upload(ctx context.Context, descr etcdbackup.Description, r io.R
ctx, cancel := context.WithCancel(ctx)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)
eg, ctx := panichandler.ErrGroupWithContext(ctx)
reader, writer := io.Pipe()

View File

@ -36,6 +36,7 @@ import (
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/runtime/talos"
)
@ -101,7 +102,7 @@ func (spec IdentityCollectorTaskSpec) Equal(other IdentityCollectorTaskSpec) boo
// RunTask runs the identity collector task.
//
//nolint:gocyclo,cyclop
func (spec IdentityCollectorTaskSpec) RunTask(ctx context.Context, _ *zap.Logger, notify IdentityCollectorChan) error {
func (spec IdentityCollectorTaskSpec) RunTask(ctx context.Context, logger *zap.Logger, notify IdentityCollectorChan) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -162,11 +163,11 @@ func (spec IdentityCollectorTaskSpec) RunTask(ctx context.Context, _ *zap.Logger
if runLegacyEtcdMemberIDCollector {
wg.Add(1)
go func() {
panichandler.Go(func() {
defer wg.Done()
spec.runLegacyEtcdMemberIDCollector(ctx, client, watchCh) //nolint:errcheck
}()
}, logger)
}
for {

View File

@ -14,6 +14,8 @@ import (
"github.com/cenkalti/backoff/v4"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/pkg/panichandler"
)
// ID is a task ID.
@ -55,11 +57,11 @@ func (task *Task[T, S]) Start(ctx context.Context) {
ctx, task.cancel = context.WithCancel(ctx)
go func() {
panichandler.Go(func() {
defer task.wg.Done()
task.runWithRestarts(ctx)
}()
}, task.logger)
}
func (task *Task[T, S]) runWithRestarts(ctx context.Context) {

View File

@ -39,6 +39,7 @@ import (
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/runtime"
"github.com/siderolabs/omni/internal/backend/runtime/kubernetes"
"github.com/siderolabs/omni/internal/pkg/config"
@ -533,19 +534,19 @@ func (ctrl *KubernetesStatusController) startWatcher(ctx context.Context, logger
w.nodeFactory.Start(ctx.Done())
w.podFactory.Start(ctx.Done())
go func() {
panichandler.Go(func() {
w.nodeFactory.WaitForCacheSync(ctx.Done())
w.nodesSynced.Store(true)
w.nodesSync(ctx, notifyCh)
}()
}, logger)
go func() {
panichandler.Go(func() {
w.podFactory.WaitForCacheSync(ctx.Done())
w.podsSynced.Store(true)
w.podsSync(ctx, notifyCh)
}()
}, logger)
if config.Config.WorkloadProxying.Enabled {
w.serviceFactory = informers.NewSharedInformerFactory(w.client.Clientset(), 0)
@ -562,12 +563,12 @@ func (ctrl *KubernetesStatusController) startWatcher(ctx context.Context, logger
w.serviceFactory.Start(ctx.Done())
go func() {
panichandler.Go(func() {
w.serviceFactory.WaitForCacheSync(ctx.Done())
w.servicesSynced.Store(true)
w.servicesSync(ctx, notifyCh)
}()
}, logger)
}
ctrl.watchers[cluster] = w

View File

@ -27,6 +27,7 @@ import (
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/runtime"
"github.com/siderolabs/omni/internal/backend/runtime/kubernetes"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/helpers"
@ -208,9 +209,9 @@ func NewKubernetesUpgradeManifestStatusController() *KubernetesUpgradeManifestSt
errCh := make(chan error, 1)
resultCh := make(chan manifests.SyncResult)
go func() {
panichandler.Go(func() {
errCh <- manifests.Sync(ctx, bootstrapManifests, cfg, true, resultCh)
}()
}, logger)
for {
select {

View File

@ -18,10 +18,12 @@ import (
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/pair/ordered"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/runtime/omni/controllers/omni/etcdbackup/store"
)
@ -31,6 +33,7 @@ import (
type State struct {
CoreState state.CoreState
StoreFactory store.Factory
Logger *zap.Logger
}
// Get implements [state.CoreState] interface.
@ -181,7 +184,7 @@ func (s *State) WatchKind(ctx context.Context, kind resource.Kind, ch chan<- sta
return makeUnsupportedError(err)
}
go func() {
panichandler.Go(func() {
list, err := s.List(ctx, kind, convertedOpts...)
if err != nil {
channel.SendWithContext(ctx, ch, state.Event{
@ -204,7 +207,7 @@ func (s *State) WatchKind(ctx context.Context, kind resource.Kind, ch chan<- sta
if !channel.SendWithContext(ctx, ch, state.Event{Type: state.Bootstrapped}) {
return
}
}()
}, s.Logger)
return nil
}

View File

@ -20,6 +20,7 @@ import (
"github.com/siderolabs/gen/pair"
"github.com/siderolabs/gen/xtesting/check"
"github.com/stretchr/testify/assert"
"go.uber.org/zap/zaptest"
"github.com/siderolabs/omni/client/pkg/omni/resources"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
@ -133,6 +134,7 @@ func TestStateList(t *testing.T) {
s := &external.State{
CoreState: tt.coreState,
StoreFactory: tt.storeFactory,
Logger: zaptest.NewLogger(t),
}
list, err := s.List(context.Background(), tt.kind, tt.opts...)
@ -229,6 +231,7 @@ func TestStateGet(t *testing.T) {
s := &external.State{
CoreState: tt.coreState,
StoreFactory: tt.storeFactory,
Logger: zaptest.NewLogger(t),
}
backup, err := s.Get(context.Background(), tt.pointer, tt.opts...)

View File

@ -337,7 +337,7 @@ func getMemberState(ctx context.Context, talosConfig *omni.TalosConfig, clusterM
if err != nil {
status.Error = err.Error()
return status, nil //nolint:nilerr
return status, nil
}
for _, info := range list {

View File

@ -65,6 +65,7 @@ func NewState(ctx context.Context, params *config.Params, logger *zap.Logger, me
return &external.State{
CoreState: primaryStorageCoreState,
StoreFactory: storeFactory,
Logger: logger,
}
default:
return primaryStorageCoreState

View File

@ -31,6 +31,7 @@ import (
"google.golang.org/grpc"
"github.com/siderolabs/omni/client/pkg/constants"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/logging"
"github.com/siderolabs/omni/internal/backend/runtime/keyprovider"
"github.com/siderolabs/omni/internal/pkg/config"
@ -183,7 +184,7 @@ func getEmbeddedEtcdClient(ctx context.Context, params *config.EtcdParams, logge
return fmt.Errorf("failed to start embedded etcd: %w", err)
}
go func() {
panichandler.Go(func() {
for etcdErr := range embeddedServer.Err() {
if etcdErr != nil {
logger.Error("embedded etcd error", zap.Error(etcdErr))
@ -191,7 +192,7 @@ func getEmbeddedEtcdClient(ctx context.Context, params *config.EtcdParams, logge
cancel()
}
}
}()
}, logger)
// give etcd some time to start
timer := time.NewTimer(15 * time.Second)

View File

@ -16,6 +16,7 @@ import (
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/logging"
)
@ -37,9 +38,9 @@ func etcdElections(ctx context.Context, client *clientv3.Client, electionKey str
campaignErrCh := make(chan error)
go func() {
panichandler.Go(func() {
campaignErrCh <- election.Campaign(ctx, campaignKey)
}()
}, logger)
logger.Info("running the etcd election campaign")
@ -78,7 +79,7 @@ campaignLoop:
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
panichandler.Go(func() {
observe := election.Observe(ctx)
observeLoop:
@ -102,7 +103,7 @@ campaignLoop:
}
}
}
}()
}, logger)
return f(ctx, client)
}

View File

@ -16,6 +16,8 @@ import (
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/pkg/panichandler"
)
// Computed is a virtual state implementation which provides virtual resources which are computed on the fly
@ -25,6 +27,7 @@ type Computed struct {
watchScheduler *DedupScheduler
activeWatches *prometheus.GaugeVec
resolveID ProducerIDTransformer
logger *zap.Logger
}
// ProducerIDTransformer maps the incoming resource id into some other id.
@ -53,6 +56,7 @@ func NewComputed(resourceType string, factory ProducerFactory, resolveID Produce
"type",
"id",
}),
logger: logger,
}
}
@ -114,13 +118,13 @@ func (st *Computed) Watch(ctx context.Context, ptr resource.Pointer, c chan<- st
activeWatches.Inc()
go func() {
panichandler.Go(func() {
<-ctx.Done()
activeWatches.Dec()
st.watchScheduler.Stop(ptr)
}()
}, st.logger)
return nil
}

View File

@ -23,6 +23,7 @@ import (
"github.com/siderolabs/omni/client/api/omni/specs"
"github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/omni/resources/virtual"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/logging"
"github.com/siderolabs/omni/internal/backend/runtime"
"github.com/siderolabs/omni/internal/backend/runtime/kubernetes"
@ -168,7 +169,7 @@ func (ku *KubernetesUsage) Start() error {
ku.wg.Add(2)
go func() {
panichandler.Go(func() {
defer ku.wg.Done()
ku.factory.WaitForCacheSync(ku.stopCh)
@ -183,9 +184,9 @@ func (ku *KubernetesUsage) Start() error {
synced.Store(true)
sync()
}()
}, ku.logger)
go func() {
panichandler.Go(func() {
defer ku.wg.Done()
<-ku.stopCh
@ -193,7 +194,7 @@ func (ku *KubernetesUsage) Start() error {
synced.Store(false)
ku.factory.Shutdown()
}()
}, ku.logger)
return nil
}

View File

@ -15,9 +15,9 @@ import (
"github.com/siderolabs/gen/channel"
"github.com/siderolabs/gen/pair"
"github.com/siderolabs/gen/pair/ordered"
"golang.org/x/sync/errgroup"
"github.com/siderolabs/omni/client/api/omni/resources"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/client/pkg/runtime"
)
@ -27,14 +27,14 @@ func (p *proxyRuntime) Watch(ctx context.Context, responses chan<- WatchResponse
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var group errgroup.Group
eg := panichandler.NewErrGroup()
opts := NewQueryOptions(option...)
cmp := MakeWatchResponseComparator(opts.SortField, opts.SortDescending)
ch := make(chan WatchResponse)
produce := watchResponseProducer(responses, opts, cmp)
group.Go(func() error {
eg.Go(func() error {
defer cancel()
slc, err := takeSorted(ctx, ch, cmp)
@ -60,13 +60,13 @@ func (p *proxyRuntime) Watch(ctx context.Context, responses chan<- WatchResponse
}
})
group.Go(func() error {
eg.Go(func() error {
defer cancel()
return p.Runtime.Watch(ctx, ch, option...)
})
return group.Wait()
return eg.Wait()
}
func watchResponseProducer(

View File

@ -56,6 +56,7 @@ import (
"github.com/siderolabs/omni/client/pkg/omni/resources"
authres "github.com/siderolabs/omni/client/pkg/omni/resources/auth"
omnires "github.com/siderolabs/omni/client/pkg/omni/resources/omni"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/backend/debug"
"github.com/siderolabs/omni/internal/backend/dns"
"github.com/siderolabs/omni/internal/backend/factory"
@ -530,7 +531,7 @@ func (s *Server) runMachineAPI(ctx context.Context) error {
)
})
grpcutil.RunServer(groupCtx, server, lis, eg)
grpcutil.RunServer(groupCtx, server, lis, eg, s.logger)
return eg.Wait()
}
@ -859,7 +860,9 @@ func runServer(ctx context.Context, srv *server, logger *zap.Logger) error {
errCh := make(chan error, 1)
go func() { errCh <- srv.ListenAndServe() }()
panichandler.Go(func() {
errCh <- srv.ListenAndServe()
}, logger)
select {
case err := <-errCh:
@ -921,7 +924,7 @@ func runLocalResourceServer(ctx context.Context, st state.CoreState, serverOptio
logger.Info("starting local resource server")
grpcutil.RunServer(ctx, grpcServer, listener, eg)
grpcutil.RunServer(ctx, grpcServer, listener, eg, logger)
return nil
}
@ -971,7 +974,9 @@ func runGRPCServer(ctx context.Context, server *grpc.Server, transport *memconn.
errCh := make(chan error, 1)
go func() { errCh <- server.Serve(grpcListener) }()
panichandler.Go(func() {
errCh <- server.Serve(grpcListener)
}, logger)
select {
case err := <-errCh:

View File

@ -11,7 +11,7 @@ import (
"fmt"
"runtime/debug"
"golang.org/x/sync/errgroup"
"github.com/siderolabs/omni/client/pkg/panichandler"
)
// EGroup defines common interface for Group and x/sync/errgroup.Group.
@ -23,7 +23,7 @@ type EGroup interface {
// Group is wrapper around Go's x/sync/errgroup.Group. It's not a drop-in replacement for it, because
// it requires initialization with WithContext.
type Group struct {
group *errgroup.Group
group EGroup
ctx context.Context //nolint:containedctx
}
@ -33,7 +33,7 @@ type Group struct {
// returns or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
withContext, newCtx := errgroup.WithContext(ctx)
withContext, newCtx := panichandler.ErrGroupWithContext(ctx)
return &Group{group: withContext, ctx: newCtx}, newCtx
}

View File

@ -12,13 +12,15 @@ import (
"net"
"time"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/siderolabs/omni/client/pkg/panichandler"
"github.com/siderolabs/omni/internal/pkg/errgroup"
)
// RunServer starts gRPC server on top of the provided listener, stops it when the context is done.
func RunServer(ctx context.Context, server *grpc.Server, lis net.Listener, eg *errgroup.Group) {
func RunServer(ctx context.Context, server *grpc.Server, lis net.Listener, eg *errgroup.Group, logger *zap.Logger) {
eg.Go(func() error {
err := server.Serve(lis)
if !errors.Is(err, grpc.ErrServerStopped) {
@ -29,13 +31,13 @@ func RunServer(ctx context.Context, server *grpc.Server, lis net.Listener, eg *e
})
eg.Go(func() error {
serverGracefulStop(server, ctx)
serverGracefulStop(server, ctx, logger)
return nil
})
}
func serverGracefulStop(server *grpc.Server, ctx context.Context) { //nolint:revive
func serverGracefulStop(server *grpc.Server, ctx context.Context, logger *zap.Logger) { //nolint:revive
<-ctx.Done()
stopped := make(chan struct{})
@ -43,10 +45,10 @@ func serverGracefulStop(server *grpc.Server, ctx context.Context) { //nolint:rev
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
go func() {
panichandler.Go(func() {
server.GracefulStop()
close(stopped)
}()
}, logger)
select {
case <-shutdownCtx.Done():

View File

@ -17,6 +17,8 @@ import (
"github.com/siderolabs/gen/containers"
"go.uber.org/zap"
"github.com/siderolabs/omni/client/pkg/panichandler"
)
// Server implements TCP server to receive JSON logs. It is similar to the one in Talos, except it doesn't try to parse
@ -75,11 +77,11 @@ func (srv *Server) Serve() error {
srv.wg.Add(1)
srv.m.Set(remoteAddress, conn)
go func() {
panichandler.Go(func() {
defer srv.wg.Done()
srv.handler.HandleConn(remoteAddr, conn)
srv.m.Remove(remoteAddress)
}()
}, srv.logger)
}
}

View File

@ -412,13 +412,13 @@ func (manager *Manager) startEventsGRPC(ctx context.Context, eg *errgroup.Group,
})
eventsapi.RegisterEventSinkServiceServer(server, sink)
grpcutil.RunServer(ctx, server, listener, eg)
grpcutil.RunServer(ctx, server, listener, eg, manager.logger)
}
func (manager *Manager) startTrustdGRPC(ctx context.Context, eg *errgroup.Group, listener net.Listener, serverAddr netip.Prefix) {
server := trustd.NewServer(manager.logger, manager.state, serverAddr.Addr().AsSlice()) //nolint:contextcheck
grpcutil.RunServer(ctx, server, listener, eg)
grpcutil.RunServer(ctx, server, listener, eg, manager.logger)
}
func (manager *Manager) startLogServer(ctx context.Context, eg *errgroup.Group, serverAddr netip.Prefix, logServerPort string) error {

View File

@ -148,7 +148,7 @@ func (suite *SiderolinkSuite) startManager(params sideromanager.Params) {
)
})
grpcutil.RunServer(groupCtx, server, lis, eg)
grpcutil.RunServer(groupCtx, server, lis, eg, zaptest.NewLogger(suite.T()))
suite.Require().NoError(eg.Wait())
}()

View File

@ -1 +1 @@
v0.38.0-beta.0
v0.38.0-beta.0