feat: split routerd from apid

New service `routerd` performs exactly single task: based on incoming
API call service name, it routes the requests to the appropriate Talos
service (`networkd`, `osd`, etc.) Service `routerd` listens of file
socket and routes requests to file sockets.

Service `apid` now does single task as well:

* it either fans out request to other `apid` services running on other
nodes and aggregates responses
* or it forwards requests to local `routerd` (when request destination
is local node)

Cons:

* one more proxying layer on request path

Pros:

* more clear service roles
* `routerd` is part of core Talos, services should register with it to
expose their API; no auth in the service (not exposed to the world)
* `apid` might be replaced with other implementation, it depends on TLS infra,
auth, etc.
* `apid` is better segregated from other Talos services (can only access
`routerd`, can't talk to other Talos services directly, so less exposure
in case of a bug)

This change is no-op to the end users.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2020-03-04 22:25:27 +03:00 committed by Andrey Smirnov
parent 856386a788
commit a068acfbe4
16 changed files with 411 additions and 80 deletions

View File

@ -196,6 +196,24 @@ WORKDIR /scratch
RUN printf "FROM scratch\nCOPY ./networkd /networkd\nENTRYPOINT [\"/networkd\"]" > Dockerfile
RUN --security=insecure img build --tag ${USERNAME}/networkd:${TAG} --output type=docker,dest=/networkd.tar --no-console .
# The routerd target builds the routerd image.
FROM base AS routerd-build
ARG SHA
ARG TAG
ARG VERSION_PKG="github.com/talos-systems/talos/internal/pkg/version"
WORKDIR /src/internal/app/routerd
RUN --mount=type=cache,target=/.cache/go-build go build -ldflags "-s -w -X ${VERSION_PKG}.Name=Server -X ${VERSION_PKG}.SHA=${SHA} -X ${VERSION_PKG}.Tag=${TAG}" -o /routerd
RUN chmod +x /routerd
FROM base AS routerd-image
ARG TAG
ARG USERNAME
COPY --from=routerd-build /routerd /scratch/routerd
WORKDIR /scratch
RUN printf "FROM scratch\nCOPY ./routerd /routerd\nENTRYPOINT [\"/routerd\"]" > Dockerfile
RUN --security=insecure img build --tag ${USERNAME}/routerd:${TAG} --output type=docker,dest=/routerd.tar --no-console .
# The osctl targets build the osctl binaries.
@ -255,6 +273,7 @@ COPY --from=ntpd-image /ntpd.tar /rootfs/usr/images/
COPY --from=osd-image /osd.tar /rootfs/usr/images/
COPY --from=trustd-image /trustd.tar /rootfs/usr/images/
COPY --from=networkd-image /networkd.tar /rootfs/usr/images/
COPY --from=routerd-image /routerd.tar /rootfs/usr/images/
# NB: We run the cleanup step before creating extra directories, files, and
# symlinks to avoid accidentally cleaning them up.
COPY ./hack/cleanup.sh /toolchain/bin/cleanup.sh

View File

@ -14,13 +14,14 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"github.com/talos-systems/talos/internal/app/apid/pkg/backend"
apidbackend "github.com/talos-systems/talos/internal/app/apid/pkg/backend"
"github.com/talos-systems/talos/internal/app/apid/pkg/director"
"github.com/talos-systems/talos/internal/app/apid/pkg/provider"
"github.com/talos-systems/talos/internal/pkg/runtime"
"github.com/talos-systems/talos/pkg/config"
"github.com/talos-systems/talos/pkg/constants"
"github.com/talos-systems/talos/pkg/grpc/factory"
"github.com/talos-systems/talos/pkg/grpc/proxy/backend"
"github.com/talos-systems/talos/pkg/startup"
)
@ -63,13 +64,10 @@ func main() {
log.Fatalf("failed to create client TLS config: %v", err)
}
backendFactory := backend.NewAPIDFactory(clientTLSConfig)
router := director.NewRouter(backendFactory.Get)
backendFactory := apidbackend.NewAPIDFactory(clientTLSConfig)
localBackend := backend.NewLocal("routerd", constants.RouterdSocketPath)
router.RegisterLocalBackend("os.OSService", backend.NewLocal("osd", constants.OSSocketPath))
router.RegisterLocalBackend("machine.MachineService", backend.NewLocal("machined", constants.MachineSocketPath))
router.RegisterLocalBackend("time.TimeService", backend.NewLocal("timed", constants.TimeSocketPath))
router.RegisterLocalBackend("network.NetworkService", backend.NewLocal("networkd", constants.NetworkSocketPath))
router := director.NewRouter(backendFactory.Get, localBackend)
// all existing streaming methods
for _, methodName := range []string{

View File

@ -7,9 +7,7 @@ package director
import (
"context"
"fmt"
"regexp"
"strings"
"github.com/talos-systems/grpc-proxy/proxy"
"google.golang.org/grpc"
@ -20,7 +18,7 @@ import (
// Router wraps grpc-proxy StreamDirector
type Router struct {
localBackends map[string]proxy.Backend
localBackend proxy.Backend
remoteBackendFactory RemoteBackendFactory
streamedMatchers []*regexp.Regexp
}
@ -29,9 +27,9 @@ type Router struct {
type RemoteBackendFactory func(target string) (proxy.Backend, error)
// NewRouter builds new Router
func NewRouter(backendFactory RemoteBackendFactory) *Router {
func NewRouter(backendFactory RemoteBackendFactory, localBackend proxy.Backend) *Router {
return &Router{
localBackends: map[string]proxy.Backend{},
localBackend: localBackend,
remoteBackendFactory: backendFactory,
}
}
@ -46,37 +44,23 @@ func (r *Router) Register(srv *grpc.Server) {
func (r *Router) Director(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return r.localDirector(fullMethodName)
return proxy.One2One, []proxy.Backend{r.localBackend}, nil
}
if _, exists := md["proxyfrom"]; exists {
return r.localDirector(fullMethodName)
return proxy.One2One, []proxy.Backend{r.localBackend}, nil
}
var targets []string
if targets, ok = md["nodes"]; !ok {
// send directly to local node, skips another layer of proxying
return r.localDirector(fullMethodName)
return proxy.One2One, []proxy.Backend{r.localBackend}, nil
}
return r.aggregateDirector(targets)
}
// localDirector sends requests down to local service in one2one mode.
//
// Local backends are registered via RegisterLocalBackend
func (r *Router) localDirector(fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
parts := strings.SplitN(fullMethodName, "/", 3)
serviceName := parts[1]
if backend, ok := r.localBackends[serviceName]; ok {
return proxy.One2One, []proxy.Backend{backend}, nil
}
return proxy.One2One, nil, status.Errorf(codes.Unknown, "service %v is not defined", serviceName)
}
// aggregateDirector sends request across set of remote instances and aggregates results.
func (r *Router) aggregateDirector(targets []string) (proxy.Mode, []proxy.Backend, error) {
var err error
@ -93,15 +77,6 @@ func (r *Router) aggregateDirector(targets []string) (proxy.Mode, []proxy.Backen
return proxy.One2Many, backends, nil
}
// RegisterLocalBackend registers local backend by service name.
func (r *Router) RegisterLocalBackend(serviceName string, backend proxy.Backend) {
if _, exists := r.localBackends[serviceName]; exists {
panic(fmt.Sprintf("local backend %v already registered", serviceName))
}
r.localBackends[serviceName] = backend
}
// StreamedDetector implements proxy.StreamedDetector.
func (r *Router) StreamedDetector(fullMethodName string) bool {
for _, re := range r.streamedMatchers {

View File

@ -23,14 +23,7 @@ type DirectorSuite struct {
}
func (suite *DirectorSuite) SetupSuite() {
suite.router = director.NewRouter(mockBackendFactory)
}
func (suite *DirectorSuite) TestRegisterLocalBackend() {
suite.router.RegisterLocalBackend("a.A", &mockBackend{})
suite.router.RegisterLocalBackend("b.B", &mockBackend{})
suite.Require().Panics(func() { suite.router.RegisterLocalBackend("a.A", &mockBackend{}) })
suite.router = director.NewRouter(mockBackendFactory, &mockBackend{})
}
func (suite *DirectorSuite) TestStreamedDetector() {
@ -48,37 +41,6 @@ func (suite *DirectorSuite) TestStreamedDetector() {
suite.Assert().False(suite.router.StreamedDetector("/service.Service/getStreamItem"))
}
func (suite *DirectorSuite) TestDirectorLocal() {
ctx := context.Background()
mode, backends, err := suite.router.Director(ctx, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Nil(backends)
suite.Assert().EqualError(err, "rpc error: code = Unknown desc = service service.Service is not defined")
suite.router.RegisterLocalBackend("service.Service", &mockBackend{target: "local"})
mode, backends, err = suite.router.Director(ctx, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Len(backends, 1)
suite.Assert().Equal("local", backends[0].(*mockBackend).target)
suite.Assert().NoError(err)
ctxProxyFrom := metadata.NewIncomingContext(ctx, metadata.Pairs("proxyfrom", "127.0.0.1"))
mode, backends, err = suite.router.Director(ctxProxyFrom, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Len(backends, 1)
suite.Assert().Equal("local", backends[0].(*mockBackend).target)
suite.Assert().NoError(err)
ctxNoTargets := metadata.NewIncomingContext(ctx, metadata.Pairs(":authority", "127.0.0.1"))
mode, backends, err = suite.router.Director(ctxNoTargets, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Len(backends, 1)
suite.Assert().Equal("local", backends[0].(*mockBackend).target)
suite.Assert().NoError(err)
}
func (suite *DirectorSuite) TestDirectorAggregate() {
ctx := context.Background()

View File

@ -43,6 +43,7 @@ func (task *StartServices) loadSystemServices(r runtime.Runtime) {
&services.APID{},
&services.OSD{},
&services.Networkd{},
&services.Routerd{},
)
if r.Platform().Mode() != runtime.Container {

View File

@ -9,6 +9,7 @@ import (
"context"
"fmt"
"net"
"path/filepath"
"strings"
"time"
@ -108,7 +109,7 @@ func (o *APID) Runner(config runtime.Configurator) (runner.Runner, error) {
mounts := []specs.Mount{
{Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}},
{Type: "bind", Destination: constants.ConfigPath, Source: constants.ConfigPath, Options: []string{"rbind", "ro"}},
{Type: "bind", Destination: constants.SystemRunPath, Source: constants.SystemRunPath, Options: []string{"bind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.RouterdSocketPath), Source: filepath.Dir(constants.RouterdSocketPath), Options: []string{"rbind", "ro"}},
}
env := []string{}

View File

@ -0,0 +1,138 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// nolint: dupl,golint
package services
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
containerdapi "github.com/containerd/containerd"
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
"google.golang.org/grpc"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/health"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/conditions"
"github.com/talos-systems/talos/internal/pkg/runtime"
"github.com/talos-systems/talos/pkg/constants"
"github.com/talos-systems/talos/pkg/grpc/dialer"
)
// Routerd implements the Service interface. It serves as the concrete type with
// the required methods.
type Routerd struct{}
// ID implements the Service interface.
func (o *Routerd) ID(config runtime.Configurator) string {
return "routerd"
}
// PreFunc implements the Service interface.
func (o *Routerd) PreFunc(ctx context.Context, config runtime.Configurator) error {
importer := containerd.NewImporter(constants.SystemContainerdNamespace, containerd.WithContainerdAddress(constants.SystemContainerdAddress))
return importer.Import(&containerd.ImportRequest{
Path: "/usr/images/routerd.tar",
Options: []containerdapi.ImportOpt{
containerdapi.WithIndexName("talos/routerd"),
},
})
}
// PostFunc implements the Service interface.
func (o *Routerd) PostFunc(config runtime.Configurator) (err error) {
return nil
}
// Condition implements the Service interface.
func (o *Routerd) Condition(config runtime.Configurator) conditions.Condition {
return nil
}
// DependsOn implements the Service interface.
func (o *Routerd) DependsOn(config runtime.Configurator) []string {
return []string{"system-containerd"}
}
func (o *Routerd) Runner(config runtime.Configurator) (runner.Runner, error) {
image := "talos/routerd"
// Set the process arguments.
args := runner.Args{
ID: o.ID(config),
ProcessArgs: []string{
"/routerd",
},
}
// Ensure socket dir exists
if err := os.MkdirAll(filepath.Dir(constants.RouterdSocketPath), 0750); err != nil {
return nil, err
}
// Set the mounts.
mounts := []specs.Mount{
{Type: "bind", Destination: "/tmp", Source: "/tmp", Options: []string{"rbind", "rshared", "rw"}},
{Type: "bind", Destination: constants.SystemRunPath, Source: constants.SystemRunPath, Options: []string{"bind", "ro"}},
{Type: "bind", Destination: filepath.Dir(constants.RouterdSocketPath), Source: filepath.Dir(constants.RouterdSocketPath), Options: []string{"rbind", "rw"}},
}
env := []string{}
for key, val := range config.Machine().Env() {
switch strings.ToLower(key) {
// explicitly exclude proxy variables from routerd since this will
// negatively impact grpc connections.
// ref: https://github.com/grpc/grpc-go/blob/0f32486dd3c9bc29705535bd7e2e43801824cbc4/clientconn.go#L199-L206
// ref: https://github.com/grpc/grpc-go/blob/63ae68c9686cc0dd26c4f7476d66bb2f5c31789f/proxy.go#L118-L144
case "no_proxy":
case "http_proxy":
case "https_proxy":
default:
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
}
return restart.New(containerd.NewRunner(
config.Debug(),
&args,
runner.WithContainerdAddress(constants.SystemContainerdAddress),
runner.WithContainerImage(image),
runner.WithEnv(env),
runner.WithOCISpecOpts(
oci.WithMounts(mounts),
),
),
restart.WithType(restart.Forever),
), nil
}
// HealthFunc implements the HealthcheckedService interface
func (o *Routerd) HealthFunc(runtime.Configurator) health.Check {
return func(ctx context.Context) error {
conn, err := grpc.Dial(
fmt.Sprintf("%s://%s", "unix", constants.RouterdSocketPath),
grpc.WithInsecure(),
grpc.WithContextDialer(dialer.DialUnix()),
)
if err != nil {
return err
}
return conn.Close()
}
}
// HealthSettings implements the HealthcheckedService interface
func (o *Routerd) HealthSettings(runtime.Configurator) *health.Settings {
return &health.DefaultSettings
}

View File

@ -0,0 +1,18 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package services_test
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/talos-systems/talos/internal/app/machined/pkg/system"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/services"
)
func TestRouterdInterfaces(t *testing.T) {
assert.Implements(t, (*system.HealthcheckedService)(nil), new(services.Routerd))
}

View File

@ -0,0 +1,52 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package main
import (
"log"
"google.golang.org/grpc"
"github.com/talos-systems/grpc-proxy/proxy"
"github.com/talos-systems/talos/internal/app/routerd/pkg/director"
"github.com/talos-systems/talos/pkg/constants"
"github.com/talos-systems/talos/pkg/grpc/factory"
"github.com/talos-systems/talos/pkg/grpc/proxy/backend"
"github.com/talos-systems/talos/pkg/startup"
)
func main() {
log.SetFlags(log.Lshortfile | log.Ldate | log.Lmicroseconds | log.Ltime)
if err := startup.RandSeed(); err != nil {
log.Fatalf("failed to seed RNG: %v", err)
}
router := director.NewRouter()
// TODO: this should be dynamic based on plugin registration
router.RegisterLocalBackend("os.OSService", backend.NewLocal("osd", constants.OSSocketPath))
router.RegisterLocalBackend("machine.MachineService", backend.NewLocal("machined", constants.MachineSocketPath))
router.RegisterLocalBackend("time.TimeService", backend.NewLocal("timed", constants.TimeSocketPath))
router.RegisterLocalBackend("network.NetworkService", backend.NewLocal("networkd", constants.NetworkSocketPath))
err := factory.ListenAndServe(
router,
factory.Network("unix"),
factory.SocketPath(constants.RouterdSocketPath),
factory.WithDefaultLog(),
factory.ServerOptions(
grpc.CustomCodec(proxy.Codec()),
grpc.UnknownServiceHandler(
proxy.TransparentHandler(
router.Director,
)),
),
)
if err != nil {
log.Fatalf("listen: %v", err)
}
}

View File

@ -0,0 +1,57 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package director provides proxy call routing facility
package director
import (
"context"
"fmt"
"strings"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/talos-systems/grpc-proxy/proxy"
)
// Router wraps grpc-proxy StreamDirector
type Router struct {
localBackends map[string]proxy.Backend
}
// NewRouter builds new Router
func NewRouter() *Router {
return &Router{
localBackends: map[string]proxy.Backend{},
}
}
// Register is no-op to implement factory.Registrator interface.
//
// Actual proxy handler is installed via grpc.UnknownServiceHandler option.
func (r *Router) Register(srv *grpc.Server) {
}
// Director implements proxy.StreamDirector function
func (r *Router) Director(ctx context.Context, fullMethodName string) (proxy.Mode, []proxy.Backend, error) {
parts := strings.SplitN(fullMethodName, "/", 3)
serviceName := parts[1]
if backend, ok := r.localBackends[serviceName]; ok {
return proxy.One2One, []proxy.Backend{backend}, nil
}
return proxy.One2One, nil, status.Errorf(codes.Unknown, "service %v is not defined", serviceName)
}
// RegisterLocalBackend registers local backend by service name.
func (r *Router) RegisterLocalBackend(serviceName string, backend proxy.Backend) {
if _, exists := r.localBackends[serviceName]; exists {
panic(fmt.Sprintf("local backend %v already registered", serviceName))
}
r.localBackends[serviceName] = backend
}

View File

@ -0,0 +1,69 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package director_test
import (
"context"
"testing"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/metadata"
"github.com/talos-systems/grpc-proxy/proxy"
"github.com/talos-systems/talos/internal/app/routerd/pkg/director"
)
type DirectorSuite struct {
suite.Suite
router *director.Router
}
func (suite *DirectorSuite) SetupSuite() {
suite.router = director.NewRouter()
}
func (suite *DirectorSuite) TestRegisterLocalBackend() {
suite.router.RegisterLocalBackend("a.A", &mockBackend{})
suite.router.RegisterLocalBackend("b.B", &mockBackend{})
suite.Require().Panics(func() { suite.router.RegisterLocalBackend("a.A", &mockBackend{}) })
}
func (suite *DirectorSuite) TestDirectorLocal() {
ctx := context.Background()
mode, backends, err := suite.router.Director(ctx, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Nil(backends)
suite.Assert().EqualError(err, "rpc error: code = Unknown desc = service service.Service is not defined")
suite.router.RegisterLocalBackend("service.Service", &mockBackend{target: "local"})
mode, backends, err = suite.router.Director(ctx, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Len(backends, 1)
suite.Assert().Equal("local", backends[0].(*mockBackend).target)
suite.Assert().NoError(err)
ctxProxyFrom := metadata.NewIncomingContext(ctx, metadata.Pairs("proxyfrom", "127.0.0.1"))
mode, backends, err = suite.router.Director(ctxProxyFrom, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Len(backends, 1)
suite.Assert().Equal("local", backends[0].(*mockBackend).target)
suite.Assert().NoError(err)
ctxNoTargets := metadata.NewIncomingContext(ctx, metadata.Pairs(":authority", "127.0.0.1"))
mode, backends, err = suite.router.Director(ctxNoTargets, "/service.Service/method")
suite.Assert().Equal(proxy.One2One, mode)
suite.Assert().Len(backends, 1)
suite.Assert().Equal("local", backends[0].(*mockBackend).target)
suite.Assert().NoError(err)
}
func TestDirectorSuite(t *testing.T) {
suite.Run(t, new(DirectorSuite))
}

View File

@ -0,0 +1,31 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package director_test
import (
"context"
"google.golang.org/grpc"
)
type mockBackend struct {
target string
}
func (m *mockBackend) String() string {
return m.target
}
func (m *mockBackend) GetConnection(ctx context.Context) (context.Context, *grpc.ClientConn, error) {
return ctx, nil, nil
}
func (m *mockBackend) AppendInfo(streaming bool, resp []byte) ([]byte, error) {
return resp, nil
}
func (m *mockBackend) BuildError(streaming bool, err error) ([]byte, error) {
return nil, nil
}

View File

@ -221,6 +221,9 @@ const (
// OSSocketPath is the path to file socket of os API.
OSSocketPath = SystemRunPath + "/osd/osd.sock"
// RouterdSocketPath is the path to file socket of router API.
RouterdSocketPath = SystemRunPath + "/routerd/routerd.sock"
// KernelUncompressedAsset defines a well known name for our uncompressed kernel filename
KernelUncompressedAsset = "vmlinux"

View File

@ -0,0 +1,6 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package backend implements common proxy backends satisfying proxy.Backend interface
package backend

View File

@ -9,10 +9,11 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/talos-systems/grpc-proxy/proxy"
"google.golang.org/grpc/metadata"
"github.com/talos-systems/talos/internal/app/apid/pkg/backend"
"github.com/talos-systems/grpc-proxy/proxy"
"github.com/talos-systems/talos/pkg/grpc/proxy/backend"
)
func TestLocalInterfaces(t *testing.T) {