refactor: drop apid file socket

This was yet another socket with implicit auth - remove it completely
by reworking the only usecase for it - cluster-side health checks.
Now these health checks build a "regular" network Talos API client (as
they anyways work only controlplane nodes).

Refactor the check for controlplane nodes to use resources instead of
machine config directly (as machine config might not be always present).

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
This commit is contained in:
Andrey Smirnov 2026-03-10 18:22:18 +04:00
parent ee53a18c8b
commit da70cedfd2
No known key found for this signature in database
GPG Key ID: 322C6F63F594CE7C
8 changed files with 132 additions and 104 deletions

View File

@ -31,7 +31,6 @@ import (
apidbackend "github.com/siderolabs/talos/internal/app/apid/pkg/backend"
"github.com/siderolabs/talos/internal/app/apid/pkg/director"
"github.com/siderolabs/talos/internal/app/apid/pkg/provider"
"github.com/siderolabs/talos/internal/pkg/selinux"
"github.com/siderolabs/talos/pkg/grpc/factory"
"github.com/siderolabs/talos/pkg/grpc/middleware/authz"
"github.com/siderolabs/talos/pkg/grpc/proxy/backend"
@ -152,19 +151,6 @@ func apidMain() error {
return fmt.Errorf("error creating listner: %w", err)
}
socketListener, err := factory.NewListener(
ctx,
factory.Network("unix"),
factory.SocketPath(constants.APISocketPath),
)
if err != nil {
return fmt.Errorf("error creating listner: %w", err)
}
if err = selinux.SetLabel(constants.APISocketPath, constants.APISocketLabel); err != nil {
return err
}
networkServer := func() *grpc.Server {
injector := &authz.Injector{
Mode: authz.Enabled,
@ -195,43 +181,12 @@ func apidMain() error {
)
}()
socketServer := func() *grpc.Server {
injector := &authz.Injector{
Mode: authz.MetadataOnly,
}
if debug.Enabled {
injector.Logger = log.New(log.Writer(), "apid/authz/injector/unix ", log.Flags()).Printf
}
return factory.NewServer(
router,
factory.WithDefaultLog(),
factory.ServerOptions(
grpc.ForceServerCodecV2(proxy.Codec()),
grpc.UnknownServiceHandler(
proxy.TransparentHandler(
router.Director,
proxy.WithStreamedDetector(router.StreamedDetector),
),
),
grpc.MaxRecvMsgSize(constants.GRPCMaxMessageSize),
),
factory.WithUnaryInterceptor(injector.UnaryInterceptor()),
factory.WithStreamInterceptor(injector.StreamInterceptor()),
)
}()
errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return networkServer.Serve(networkListener)
})
errGroup.Go(func() error {
return socketServer.Serve(socketListener)
})
errGroup.Go(func() error {
return tlsConfig.Watch(ctx, onPKIUpdate)
})
@ -243,7 +198,6 @@ func apidMain() error {
defer shutdownCancel()
factory.ServerGracefulStop(networkServer, shutdownCtx)
factory.ServerGracefulStop(socketServer, shutdownCtx)
return nil
})

View File

@ -0,0 +1,38 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package machinehelper provides helper functions for machine-related information.
package machinehelper
import (
"context"
"fmt"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/siderolabs/talos/pkg/machinery/resources/config"
)
// CheckControlplane implements the controlplane machine type check.
//
// This works for API handlers.
func CheckControlplane(ctx context.Context, resources state.State, apiName string) error {
machineType, err := safe.StateGetByID[*config.MachineType](ctx, resources, config.MachineTypeID)
if err != nil {
if state.IsNotFoundError(err) {
return status.Errorf(codes.Unimplemented, "machine type is not set, cannot use %s API", apiName)
}
return fmt.Errorf("failed to get machine type: %w", err)
}
if !machineType.MachineType().IsControlPlane() {
return status.Errorf(codes.Unimplemented, "%s is only available on control plane nodes", apiName)
}
return nil
}

View File

@ -15,10 +15,10 @@ import (
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/gen/xslices"
"google.golang.org/grpc/metadata"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/siderolabs/talos/internal/app/internal/machinehelper"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/cluster"
"github.com/siderolabs/talos/pkg/cluster/check"
@ -32,7 +32,15 @@ import (
// HealthCheck implements the cluster.ClusterServer interface.
func (s *Server) HealthCheck(in *clusterapi.HealthCheckRequest, srv clusterapi.ClusterService_HealthCheckServer) error {
clientProvider := &cluster.LocalClientProvider{}
if err := machinehelper.CheckControlplane(srv.Context(), s.Controller.Runtime().State().V1Alpha2().Resources(), "cluster health check"); err != nil {
return err
}
clientProvider := cluster.NewLocalClientProvider(
s.Controller.Runtime().State().V1Alpha2().Resources(),
// use talosconfig with same roles as incoming request for local communication
authz.GetRoles(srv.Context()),
)
defer clientProvider.Close() //nolint:errcheck
k8sProvider := &cluster.KubernetesClient{
@ -44,10 +52,6 @@ func (s *Server) HealthCheck(in *clusterapi.HealthCheckRequest, srv clusterapi.C
checkCtx, checkCtxCancel := context.WithTimeout(srv.Context(), in.WaitTimeout.AsDuration())
defer checkCtxCancel()
md := metadata.New(nil)
authz.SetMetadata(md, authz.GetRoles(srv.Context()))
checkCtx = metadata.NewOutgoingContext(checkCtx, md)
r := s.Controller.Runtime()
clusterInfo, err := buildClusterInfo(checkCtx, in, r, *k8sProvider)

View File

@ -54,6 +54,7 @@ import (
"github.com/siderolabs/talos/internal/app/debug"
"github.com/siderolabs/talos/internal/app/images"
"github.com/siderolabs/talos/internal/app/internal/machinehelper"
"github.com/siderolabs/talos/internal/app/lifecycle"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime/v1alpha1/bootloader"
@ -135,17 +136,6 @@ func (s *Server) checkSupported(feature runtime.ModeCapability) error {
return nil
}
func (s *Server) checkControlplane(apiName string) error {
switch s.Controller.Runtime().Config().Machine().Type() { //nolint:exhaustive
case machinetype.TypeControlPlane:
fallthrough
case machinetype.TypeInit:
return nil
}
return status.Errorf(codes.Unimplemented, "%s is only available on control plane nodes", apiName)
}
// Register implements the factory.Registrator interface.
func (s *Server) Register(obj *grpc.Server) {
s.server = obj
@ -421,8 +411,8 @@ func (s *Server) Bootstrap(ctx context.Context, in *machine.BootstrapRequest) (r
return nil, status.Error(codes.FailedPrecondition, "bootstrap is not available yet")
}
if s.Controller.Runtime().Config().Machine().Type() == machinetype.TypeWorker {
return nil, status.Error(codes.FailedPrecondition, "bootstrap can only be performed on a control plane node")
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "bootstrap"); err != nil {
return nil, err
}
timeCtx, timeCtxCancel := context.WithTimeout(ctx, 5*time.Second)
@ -1212,7 +1202,7 @@ func (s *Server) Version(ctx context.Context, in *emptypb.Empty) (reply *machine
// Kubeconfig implements the machine.MachineServer interface.
func (s *Server) Kubeconfig(empty *emptypb.Empty, obj machine.MachineService_KubeconfigServer) error {
if err := s.checkControlplane("kubeconfig"); err != nil {
if err := machinehelper.CheckControlplane(obj.Context(), s.Controller.Runtime().State().V1Alpha2().Resources(), "kubeconfig"); err != nil {
return err
}
@ -1765,7 +1755,7 @@ func (s *Server) Memory(ctx context.Context, in *emptypb.Empty) (reply *machine.
// EtcdMemberList implements the machine.MachineServer interface.
func (s *Server) EtcdMemberList(ctx context.Context, in *machine.EtcdMemberListRequest) (*machine.EtcdMemberListResponse, error) {
if err := s.checkControlplane("member list"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "member list"); err != nil {
return nil, err
}
@ -1814,7 +1804,7 @@ func (s *Server) EtcdMemberList(ctx context.Context, in *machine.EtcdMemberListR
// EtcdRemoveMemberByID implements the machine.MachineServer interface.
func (s *Server) EtcdRemoveMemberByID(ctx context.Context, in *machine.EtcdRemoveMemberByIDRequest) (*machine.EtcdRemoveMemberByIDResponse, error) {
if err := s.checkControlplane("etcd remove member"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd remove member"); err != nil {
return nil, err
}
@ -1844,7 +1834,7 @@ func (s *Server) EtcdRemoveMemberByID(ctx context.Context, in *machine.EtcdRemov
// EtcdLeaveCluster implements the machine.MachineServer interface.
func (s *Server) EtcdLeaveCluster(ctx context.Context, in *machine.EtcdLeaveClusterRequest) (*machine.EtcdLeaveClusterResponse, error) {
if err := s.checkControlplane("etcd leave"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd leave"); err != nil {
return nil, err
}
@ -1870,7 +1860,7 @@ func (s *Server) EtcdLeaveCluster(ctx context.Context, in *machine.EtcdLeaveClus
// EtcdForfeitLeadership implements the machine.MachineServer interface.
func (s *Server) EtcdForfeitLeadership(ctx context.Context, in *machine.EtcdForfeitLeadershipRequest) (*machine.EtcdForfeitLeadershipResponse, error) {
if err := s.checkControlplane("etcd forfeit leadership"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd forfeit leadership"); err != nil {
return nil, err
}
@ -1904,7 +1894,7 @@ func (s *Server) EtcdForfeitLeadership(ctx context.Context, in *machine.EtcdForf
// EtcdSnapshot implements the machine.MachineServer interface.
func (s *Server) EtcdSnapshot(in *machine.EtcdSnapshotRequest, srv machine.MachineService_EtcdSnapshotServer) error {
if err := s.checkControlplane("etcd snapshot"); err != nil {
if err := machinehelper.CheckControlplane(srv.Context(), s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd snapshot"); err != nil {
return err
}
@ -1951,7 +1941,7 @@ func (s *Server) EtcdRecover(srv machine.MachineService_EtcdRecoverServer) error
return err
}
if err := s.checkControlplane("etcd recover"); err != nil {
if err := machinehelper.CheckControlplane(srv.Context(), s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd recover"); err != nil {
return err
}
@ -2031,7 +2021,7 @@ func mapAlarms(alarms []*etcdserverpb.AlarmMember) []*machine.EtcdMemberAlarm {
//
// This method is available only on control plane nodes (which run etcd).
func (s *Server) EtcdAlarmList(ctx context.Context, in *emptypb.Empty) (*machine.EtcdAlarmListResponse, error) {
if err := s.checkControlplane("etcd alarm list"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd alarm list"); err != nil {
return nil, err
}
@ -2061,7 +2051,7 @@ func (s *Server) EtcdAlarmList(ctx context.Context, in *emptypb.Empty) (*machine
//
// This method is available only on control plane nodes (which run etcd).
func (s *Server) EtcdAlarmDisarm(ctx context.Context, in *emptypb.Empty) (*machine.EtcdAlarmDisarmResponse, error) {
if err := s.checkControlplane("etcd alarm list"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd alarm disarm"); err != nil {
return nil, err
}
@ -2094,7 +2084,7 @@ func (s *Server) EtcdAlarmDisarm(ctx context.Context, in *emptypb.Empty) (*machi
//
// This method is available only on control plane nodes (which run etcd).
func (s *Server) EtcdDefragment(ctx context.Context, in *emptypb.Empty) (*machine.EtcdDefragmentResponse, error) {
if err := s.checkControlplane("etcd defragment"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd defragment"); err != nil {
return nil, err
}
@ -2122,7 +2112,7 @@ func (s *Server) EtcdDefragment(ctx context.Context, in *emptypb.Empty) (*machin
//
// This method is available only on control plane nodes (which run etcd).
func (s *Server) EtcdStatus(ctx context.Context, in *emptypb.Empty) (*machine.EtcdStatusResponse, error) {
if err := s.checkControlplane("etcd status"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd status"); err != nil {
return nil, err
}
@ -2176,7 +2166,7 @@ func (s *Server) EtcdStatus(ctx context.Context, in *emptypb.Empty) (*machine.Et
//
// This method is available only on control plane nodes (which run etcd).
func (s *Server) EtcdDowngradeCancel(ctx context.Context, _ *emptypb.Empty) (*machine.EtcdDowngradeCancelResponse, error) {
if err := s.checkControlplane("etcd downgrade cancel"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd downgrade cancel"); err != nil {
return nil, err
}
@ -2210,7 +2200,7 @@ func (s *Server) EtcdDowngradeCancel(ctx context.Context, _ *emptypb.Empty) (*ma
//
//nolint:dupl
func (s *Server) EtcdDowngradeEnable(ctx context.Context, in *machine.EtcdDowngradeEnableRequest) (*machine.EtcdDowngradeEnableResponse, error) {
if err := s.checkControlplane("etcd downgrade cancel"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd downgrade enable"); err != nil {
return nil, err
}
@ -2248,7 +2238,7 @@ func (s *Server) EtcdDowngradeEnable(ctx context.Context, in *machine.EtcdDowngr
//
//nolint:dupl
func (s *Server) EtcdDowngradeValidate(ctx context.Context, in *machine.EtcdDowngradeValidateRequest) (*machine.EtcdDowngradeValidateResponse, error) {
if err := s.checkControlplane("etcd downgrade cancel"); err != nil {
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "etcd downgrade validate"); err != nil {
return nil, err
}
@ -2312,8 +2302,8 @@ func validateDowngrade(version string) error {
// GenerateClientConfiguration implements the machine.MachineServer interface.
func (s *Server) GenerateClientConfiguration(ctx context.Context, in *machine.GenerateClientConfigurationRequest) (*machine.GenerateClientConfigurationResponse, error) {
if s.Controller.Runtime().Config().Machine().Type() == machinetype.TypeWorker {
return nil, status.Error(codes.FailedPrecondition, "client configuration (talosconfig) can't be generated on worker nodes")
if err := machinehelper.CheckControlplane(ctx, s.Controller.Runtime().State().V1Alpha2().Resources(), "generate client configuration"); err != nil {
return nil, err
}
crtTTL := in.CrtTtl.AsDuration()

View File

@ -144,12 +144,12 @@ func (o *APID) Volumes(runtime.Runtime) []string {
//nolint:gocyclo
func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) {
// Ensure socket dir exists
if err := os.MkdirAll(filepath.Dir(constants.APISocketPath), 0o750); err != nil {
if err := os.MkdirAll(filepath.Dir(constants.APIRuntimeSocketPath), 0o750); err != nil {
return nil, err
}
// Make sure apid user owns socket directory.
if err := os.Chown(filepath.Dir(constants.APISocketPath), constants.ApidUserID, constants.ApidUserID); err != nil {
if err := os.Chown(filepath.Dir(constants.APIRuntimeSocketPath), constants.ApidUserID, constants.ApidUserID); err != nil {
return nil, err
}
@ -165,7 +165,7 @@ func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) {
mounts := []specs.Mount{
{Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.MachineSocketPath), Source: filepath.Dir(constants.MachineSocketPath), Options: []string{"rbind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.APISocketPath), Source: filepath.Dir(constants.APISocketPath), Options: []string{"rbind", "rw"}},
{Type: "bind", Destination: filepath.Dir(constants.APIRuntimeSocketPath), Source: filepath.Dir(constants.APIRuntimeSocketPath), Options: []string{"rbind", "rw"}},
}
mounts = bindMountContainerMarker(mounts)

View File

@ -94,7 +94,6 @@ func (suite *SELinuxSuite) TestFileMountLabels() {
"/var/run": constants.RunSelinuxLabel,
// Runtime files
constants.APIRuntimeSocketPath: constants.APIRuntimeSocketLabel,
constants.APISocketPath: constants.APISocketLabel,
constants.DBusClientSocketPath: constants.DBusClientSocketLabel,
constants.UdevRulesPath: constants.UdevRulesLabel,
constants.DBusServiceSocketPath: constants.DBusServiceSocketLabel,

View File

@ -6,18 +6,36 @@ package cluster
import (
"context"
"errors"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/constants"
clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
secretsgen "github.com/siderolabs/talos/pkg/machinery/config/generate/secrets"
"github.com/siderolabs/talos/pkg/machinery/resources/network"
"github.com/siderolabs/talos/pkg/machinery/resources/secrets"
"github.com/siderolabs/talos/pkg/machinery/role"
)
// LocalClientProvider builds Talos client to connect to same-node apid instance over file socket.
type LocalClientProvider struct {
client *client.Client
client *client.Client
resources state.State
roles role.Set
}
// NewLocalClientProvider creates a new LocalClientProvider instance.
//
// This provider only works on controlplane nodes, as it relies on the
// root Talos API certificate being available.
func NewLocalClientProvider(resources state.State, roles role.Set) *LocalClientProvider {
return &LocalClientProvider{
resources: resources,
roles: roles,
}
}
// Client returns Talos client instance for default (if no endpoints are given) or
@ -25,22 +43,50 @@ type LocalClientProvider struct {
//
// Client implements ClientProvider interface.
func (c *LocalClientProvider) Client(endpoints ...string) (*client.Client, error) {
if len(endpoints) > 0 {
return nil, errors.New("custom endpoints not supported with LocalClientProvider")
if c.client != nil {
return c.client, nil
}
var err error
ctx := context.TODO()
if c.client == nil {
c.client, err = client.New(
context.TODO(),
client.WithUnixSocket(constants.APISocketPath),
client.WithGRPCDialOptions(
grpc.WithTransportCredentials(insecure.NewCredentials()),
),
)
rootSecrets, err := safe.StateGetByID[*secrets.OSRoot](ctx, c.resources, secrets.OSRootID)
if err != nil {
return nil, fmt.Errorf("failed to get OS root secrets: %w", err)
}
nodeAddress, err := safe.StateGetByID[*network.NodeAddress](ctx, c.resources, network.NodeAddressDefaultID)
if err != nil {
return nil, fmt.Errorf("failed to get node address: %w", err)
}
if len(nodeAddress.TypedSpec().IPs()) == 0 {
return nil, fmt.Errorf("no node IPs found in node address")
}
if len(endpoints) == 0 {
endpoints = []string{nodeAddress.TypedSpec().IPs()[0].String()}
}
// use a short-lived certificate, as we need to connect once
const certificateTTL = 10 * time.Minute
cert, err := secretsgen.NewAdminCertificateAndKey(
time.Now(),
rootSecrets.TypedSpec().IssuingCA,
c.roles,
certificateTTL,
)
if err != nil {
return nil, fmt.Errorf("failed to generate client certificate: %w", err)
}
talosconfig := clientconfig.NewConfig("local", endpoints, rootSecrets.TypedSpec().IssuingCA.Crt, cert)
c.client, err = client.New(
ctx,
client.WithConfig(talosconfig),
)
return c.client, err
}

View File

@ -570,9 +570,6 @@ const (
// APISocketPath is the path to file socket of apid.
APISocketPath = SystemRunPath + "/apid/apid.sock"
// APISocketLabel is the SELinux label for apid socket file.
APISocketLabel = "system_u:object_r:apid_socket_t:s0"
// APIRuntimeSocketPath is the path to file socket of runtime server for apid.
APIRuntimeSocketPath = SystemRunPath + "/apid/runtime.sock"