omni/internal/backend/grpc/grpc.go
Artem Chernyshev 1e4e303c09
feat: implement omnictl support command
Works the same way as `talosctl support` but also grabs some relevant
Omni resources to help with the diagnostics.

Uses `go-talos-support` common module to collect Talos data.

Signed-off-by: Artem Chernyshev <artem.chernyshev@talos-systems.com>
2024-03-19 14:20:46 +03:00

131 lines
3.6 KiB
Go

// Copyright (c) 2024 Sidero Labs, Inc.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// Package grpc implements gRPC server.
package grpc
import (
"compress/gzip"
"context"
"fmt"
"math"
"net"
"net/http"
"github.com/cosi-project/runtime/pkg/state"
gateway "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
"github.com/siderolabs/omni/client/api/talos/machine"
"github.com/siderolabs/omni/internal/backend/dns"
"github.com/siderolabs/omni/internal/backend/logging"
"github.com/siderolabs/omni/internal/backend/monitoring"
"github.com/siderolabs/omni/internal/memconn"
"github.com/siderolabs/omni/internal/pkg/compress"
"github.com/siderolabs/omni/internal/pkg/config"
"github.com/siderolabs/omni/internal/pkg/siderolink"
)
// ServiceServer is a gRPC service server.
type ServiceServer interface {
register(grpc.ServiceRegistrar)
gateway(context.Context, *gateway.ServeMux, string, []grpc.DialOption) error
}
// MakeServiceServers creates a list of service servers.
func MakeServiceServers(
state state.State,
logHandler *siderolink.LogHandler,
oidcProvider OIDCProvider,
jwtSigningKeyProvider JWTSigningKeyProvider,
dnsService *dns.Service,
logger *zap.Logger,
) ([]ServiceServer, error) {
dest, err := generateDest(config.Config.APIURL)
if err != nil {
return nil, err
}
return []ServiceServer{
&ResourceServer{},
&oidcServer{
provider: oidcProvider,
},
&managementServer{
logHandler: logHandler,
omniconfigDest: dest,
omniState: state,
dnsService: dnsService,
jwtSigningKeyProvider: jwtSigningKeyProvider,
logger: logger.With(logging.Component("management_server")),
},
&authServer{
state: state,
logger: logger.With(logging.Component("auth_server")),
},
&COSIResourceServer{
State: state,
},
}, nil
}
// New creates new grpc server and registers all routes.
func New(ctx context.Context, mux *http.ServeMux, servers []ServiceServer, transport *memconn.Transport, logger *zap.Logger, options ...grpc.ServerOption) (*grpc.Server, error) {
server := grpc.NewServer(options...)
for _, srv := range servers {
srv.register(server)
}
marshaller := &gateway.JSONPb{
MarshalOptions: protojson.MarshalOptions{
UseProtoNames: true,
UseEnumNumbers: true,
},
}
runtimeMux := gateway.NewServeMux(
gateway.WithMarshalerOption(gateway.MIMEWildcard, marshaller),
)
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
// we are proxying requests to ourselves, so we don't need to impose a limit
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
grpc.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return transport.Dial()
}),
grpc.WithSharedWriteBuffer(true),
}
for _, srv := range servers {
err := srv.gateway(ctx, runtimeMux, transport.Address, opts)
if err != nil {
return nil, fmt.Errorf("error registering gateway: %w", err)
}
}
if err := machine.RegisterMachineServiceHandlerFromEndpoint(ctx, runtimeMux, transport.Address, opts); err != nil {
return nil, fmt.Errorf("error registering gateway: %w", err)
}
mux.Handle("/api/",
compress.Handler(
monitoring.NewHandler(
logging.NewHandler(
http.StripPrefix("/api", runtimeMux),
logger.With(zap.String("handler", "grpc_gateway")),
),
prometheus.Labels{"handler": "grpc_gateway"},
),
gzip.DefaultCompression,
),
)
return server, nil
}