omni/internal/backend/grpc/router/server.go
Dmitriy Matrenichev a1a1d08f82
chore: bump deps
Bump github.com/siderolabs/grpc-proxy to v0.4.1 and replace deprecated calls to `grpc.CustomCodec`.

Same as in https://github.com/siderolabs/talos/pull/8999

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
2024-07-09 20:32:22 +03:00

62 lines
1.7 KiB
Go

// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
package router
import (
"context"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/siderolabs/grpc-proxy/proxy"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"github.com/siderolabs/omni/internal/pkg/grpcutil"
)
// Director is a gRPC proxy director.
type Director interface {
Director(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error)
}
// NewServer creates new gRPC server which routes request either to self or to Talos backend.
func NewServer(router Director, options ...grpc.ServerOption) *grpc.Server {
opts := append(
[]grpc.ServerOption{
grpc.ForceServerCodec(proxy.Codec()),
grpc.UnknownServiceHandler(
proxy.TransparentHandler(
router.Director,
),
),
grpc.SharedWriteBuffer(true),
},
options...,
)
return grpc.NewServer(opts...)
}
// Interceptors returns gRPC interceptors for router.
func Interceptors(logger *zap.Logger) grpc.ServerOption {
return grpc.ChainStreamInterceptor(
grpc_ctxtags.StreamServerInterceptor(),
grpc_zap.StreamServerInterceptor(logger, grpc_zap.WithMessageProducer(msgProducer)),
grpcutil.StreamSetUserAgent(),
grpcutil.StreamSetRealPeerAddress(),
)
}
func msgProducer(ctx context.Context, msg string, level zapcore.Level, code codes.Code, err error, duration zapcore.Field) {
if !grpcutil.ShouldLog(ctx) {
return
}
grpc_zap.DefaultMessageProducer(ctx, msg, level, code, err, duration)
}