From 5ecddf286627d8cc8d347ffe892bc9266cbe207b Mon Sep 17 00:00:00 2001 From: Andrey Smirnov Date: Wed, 8 Jul 2020 21:33:19 +0300 Subject: [PATCH] feat: add round-robin LB policy to Talos client by default Handling of multiple endpoints has already been implemented in #2094. This PR enables round-robin policy so that grpc picks up new endpoint for each call (and not send each request to the first control plane node). Endpoint list is randomized to handle cases when only one request is going to be sent, so that it doesn't go always to the first node in the list. gprc handles dead/unresponsive nodes automatically for us. `talosctl cluster create` and provision tests switched to use client-side load balancer for Talos API. On the additional improvements we got: * `talosctl` now reports correct node IP when using commands without `-n`, not the loadbalancer IP (if using multiple endpoints of course) * loadbalancer can't provide reliable handling of errors when upstream server is unresponsive or there're no upstreams available, grpc returns much more helpful errors Fixes #1641 Signed-off-by: Andrey Smirnov --- cmd/talosctl/cmd/mgmt/cluster/create.go | 7 ++- internal/integration/api/events.go | 29 +++++----- internal/integration/api/logs.go | 31 ++++++---- internal/integration/provision/upgrade.go | 9 ++- internal/pkg/cluster/check/default.go | 2 +- internal/pkg/cluster/check/reporter.go | 3 +- internal/pkg/cluster/check/service.go | 70 +++++++++++------------ pkg/client/events.go | 4 ++ pkg/client/resolver.go | 32 +++++++++-- 9 files changed, 117 insertions(+), 70 deletions(-) diff --git a/cmd/talosctl/cmd/mgmt/cluster/create.go b/cmd/talosctl/cmd/mgmt/cluster/create.go index 8ae0cba80..bd279a4dd 100644 --- a/cmd/talosctl/cmd/mgmt/cluster/create.go +++ b/cmd/talosctl/cmd/mgmt/cluster/create.go @@ -202,7 +202,7 @@ func create(ctx context.Context) (err error) { })) } - defaultInternalLB, defaultExternalLB := provisioner.GetLoadBalancers(request.Network) + defaultInternalLB, _ := provisioner.GetLoadBalancers(request.Network) if defaultInternalLB == "" { // provisioner doesn't provide internal LB, so use first master node @@ -218,7 +218,10 @@ func create(ctx context.Context) (err error) { case forceInitNodeAsEndpoint: endpointList = []string{ips[0].String()} default: - endpointList = []string{defaultExternalLB} + // use control plane nodes as endpoints, client-side load-balancing + for i := 0; i < masters; i++ { + endpointList = append(endpointList, ips[i].String()) + } } genOptions = append(genOptions, generate.WithEndpointList(endpointList)) diff --git a/internal/integration/api/events.go b/internal/integration/api/events.go index c254e6aef..aa468a14b 100644 --- a/internal/integration/api/events.go +++ b/internal/integration/api/events.go @@ -9,6 +9,7 @@ package api import ( "context" "fmt" + "math/rand" "time" "github.com/talos-systems/talos/api/machine" @@ -22,6 +23,8 @@ type EventsSuite struct { ctx context.Context ctxCancel context.CancelFunc + + nodeCtx context.Context } // SuiteName ... @@ -32,7 +35,13 @@ func (suite *EventsSuite) SuiteName() string { // SetupTest ... func (suite *EventsSuite) SetupTest() { // make sure API calls have timeout - suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute) + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 30*time.Second) + + nodes := suite.DiscoverNodes() + suite.Require().NotEmpty(nodes) + node := nodes[rand.Intn(len(nodes))] + + suite.nodeCtx = client.WithNodes(suite.ctx, node) } // TearDownTest ... @@ -44,10 +53,7 @@ func (suite *EventsSuite) TearDownTest() { func (suite *EventsSuite) TestServiceEvents() { const service = "timed" // any restartable service should work - ctx, ctxCancel := context.WithTimeout(suite.ctx, 30*time.Second) - defer ctxCancel() - - svcInfo, err := suite.Client.ServiceInfo(ctx, service) + svcInfo, err := suite.Client.ServiceInfo(suite.nodeCtx, service) suite.Require().NoError(err) if len(svcInfo) == 0 { // service is not registered (e.g. docker) @@ -73,8 +79,8 @@ func (suite *EventsSuite) TestServiceEvents() { } go func() { - suite.Assert().NoError(suite.Client.EventsWatch(ctx, func(ch <-chan client.Event) { - defer ctxCancel() + suite.Assert().NoError(suite.Client.EventsWatch(suite.nodeCtx, func(ch <-chan client.Event) { + defer suite.ctxCancel() for event := range ch { if msg, ok := event.Payload.(*machine.ServiceStateEvent); ok && msg.GetService() == service { @@ -91,23 +97,20 @@ func (suite *EventsSuite) TestServiceEvents() { // wait for event watcher to start time.Sleep(200 * time.Millisecond) - _, err = suite.Client.ServiceRestart(ctx, service) + _, err = suite.Client.ServiceRestart(suite.nodeCtx, service) suite.Assert().NoError(err) - <-ctx.Done() + <-suite.ctx.Done() suite.Require().NoError(checkExpectedActions()) } // TestEventsWatch verifies events watch API. func (suite *EventsSuite) TestEventsWatch() { - ctx, ctxCancel := context.WithTimeout(suite.ctx, 30*time.Second) - defer ctxCancel() - receiveEvents := func(opts ...client.EventsOptionFunc) []client.Event { result := []client.Event{} - watchCtx, watchCtxCancel := context.WithCancel(ctx) + watchCtx, watchCtxCancel := context.WithCancel(suite.nodeCtx) defer watchCtxCancel() suite.Assert().NoError(suite.Client.EventsWatch(watchCtx, func(ch <-chan client.Event) { diff --git a/internal/integration/api/logs.go b/internal/integration/api/logs.go index 55b634f16..f33a3d8cc 100644 --- a/internal/integration/api/logs.go +++ b/internal/integration/api/logs.go @@ -12,6 +12,7 @@ import ( "context" "io" "io/ioutil" + "math/rand" "time" "github.com/talos-systems/talos/api/common" @@ -26,6 +27,8 @@ type LogsSuite struct { ctx context.Context ctxCancel context.CancelFunc + + nodeCtx context.Context } // SuiteName ... @@ -37,6 +40,12 @@ func (suite *LogsSuite) SuiteName() string { func (suite *LogsSuite) SetupTest() { // make sure API calls have timeout suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute) + + nodes := suite.DiscoverNodes() + suite.Require().NotEmpty(nodes) + node := nodes[rand.Intn(len(nodes))] + + suite.nodeCtx = client.WithNodes(suite.ctx, node) } // TearDownTest ... @@ -46,7 +55,7 @@ func (suite *LogsSuite) TearDownTest() { // TestServicesHaveLogs verifies that each service has logs. func (suite *LogsSuite) TestServicesHaveLogs() { - servicesReply, err := suite.Client.ServiceList(suite.ctx) + servicesReply, err := suite.Client.ServiceList(suite.nodeCtx) suite.Require().NoError(err) suite.Require().Len(servicesReply.Messages, 1) @@ -55,7 +64,7 @@ func (suite *LogsSuite) TestServicesHaveLogs() { for _, svc := range servicesReply.Messages[0].Services { logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, svc.Id, @@ -84,13 +93,13 @@ func (suite *LogsSuite) TestTail() { // invoke machined enough times to generate // some logs for i := 0; i < 20; i++ { - _, err := suite.Client.Version(suite.ctx) + _, err := suite.Client.Version(suite.nodeCtx) suite.Require().NoError(err) } for _, tailLines := range []int32{0, 1, 10} { logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, "apid", @@ -122,7 +131,7 @@ func (suite *LogsSuite) TestTail() { // TestServiceNotFound verifies error if service name is not found. func (suite *LogsSuite) TestServiceNotFound() { logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, "nosuchservice", @@ -133,10 +142,10 @@ func (suite *LogsSuite) TestServiceNotFound() { suite.Require().NoError(logsStream.CloseSend()) - _, err = logsStream.Recv() - suite.Require().Error(err) + msg, err := logsStream.Recv() + suite.Require().NoError(err) - suite.Require().Regexp(`.+log "nosuchservice" was not registered$`, err.Error()) + suite.Require().Regexp(`.+log "nosuchservice" was not registered$`, msg.Metadata.Error) } // TestStreaming verifies that logs are streamed in real-time. @@ -159,13 +168,13 @@ func (suite *LogsSuite) testStreaming(tailLines int32) { // invoke osd enough times to generate // some logs for i := int32(0); i < tailLines; i++ { - _, err := suite.Client.Stats(suite.ctx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) + _, err := suite.Client.Stats(suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) suite.Require().NoError(err) } } logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, "osd", @@ -221,7 +230,7 @@ DrainLoop: } // invoke osd API - _, err = suite.Client.Stats(suite.ctx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) + _, err = suite.Client.Stats(suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) suite.Require().NoError(err) // there should be a line in the logs diff --git a/internal/integration/provision/upgrade.go b/internal/integration/provision/upgrade.go index 1b288482c..241e4b4fb 100644 --- a/internal/integration/provision/upgrade.go +++ b/internal/integration/provision/upgrade.go @@ -245,7 +245,7 @@ func (suite *UpgradeSuite) setupCluster() { StateDirectory: suite.stateDir, } - defaultInternalLB, defaultExternalLB := suite.provisioner.GetLoadBalancers(request.Network) + defaultInternalLB, _ := suite.provisioner.GetLoadBalancers(request.Network) genOptions := suite.provisioner.GenOptions(request.Network) @@ -256,6 +256,11 @@ func (suite *UpgradeSuite) setupCluster() { genOptions = append(genOptions, generate.WithRegistryMirror(parts[0], parts[1])) } + masterEndpoints := make([]string, suite.spec.MasterNodes) + for i := range masterEndpoints { + masterEndpoints[i] = ips[i].String() + } + suite.configBundle, err = config.NewConfigBundle(config.WithInputOptions( &config.InputOptions{ ClusterName: clusterName, @@ -263,7 +268,7 @@ func (suite *UpgradeSuite) setupCluster() { KubeVersion: "", // keep empty so that default version is used per Talos version GenOptions: append( genOptions, - generate.WithEndpointList([]string{defaultExternalLB}), + generate.WithEndpointList(masterEndpoints), generate.WithInstallImage(suite.spec.SourceInstallerImage), ), })) diff --git a/internal/pkg/cluster/check/default.go b/internal/pkg/cluster/check/default.go index 273c52c9f..b97949cdb 100644 --- a/internal/pkg/cluster/check/default.go +++ b/internal/pkg/cluster/check/default.go @@ -59,7 +59,7 @@ func DefaultClusterChecks() []ClusterCheck { func(cluster ClusterInfo) conditions.Condition { return conditions.PollingCondition("all control plane components to be ready", func(ctx context.Context) error { return K8sFullControlPlaneAssertion(ctx, cluster) - }, 2*time.Minute, 5*time.Second) + }, 5*time.Minute, 5*time.Second) }, // wait for kube-proxy to report ready func(cluster ClusterInfo) conditions.Condition { diff --git a/internal/pkg/cluster/check/reporter.go b/internal/pkg/cluster/check/reporter.go index 57c98ada4..13878ccf0 100644 --- a/internal/pkg/cluster/check/reporter.go +++ b/internal/pkg/cluster/check/reporter.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "strings" "github.com/talos-systems/talos/internal/pkg/conditions" ) @@ -21,7 +22,7 @@ func (wr *writerReporter) Update(condition conditions.Condition) { line := fmt.Sprintf("waiting for %s", condition) if line != wr.lastLine { - fmt.Fprintln(wr.w, line) + fmt.Fprintln(wr.w, strings.TrimSpace(line)) wr.lastLine = line } } diff --git a/internal/pkg/cluster/check/service.go b/internal/pkg/cluster/check/service.go index b9c8de8e2..eb47742db 100644 --- a/internal/pkg/cluster/check/service.go +++ b/internal/pkg/cluster/check/service.go @@ -10,6 +10,8 @@ import ( "fmt" "sort" + "github.com/hashicorp/go-multierror" + "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/pkg/client" ) @@ -26,58 +28,42 @@ func ServiceStateAssertion(ctx context.Context, cluster ClusterInfo, service str return err } - var node string + // by default, we check all control plane nodes. if some nodes don't have that service running, + // it won't be returned in the response + nodes := append(cluster.NodesByType(runtime.MachineTypeInit), cluster.NodesByType(runtime.MachineTypeControlPlane)...) + nodesCtx := client.WithNodes(ctx, nodes...) - switch { - case len(cluster.NodesByType(runtime.MachineTypeInit)) > 0: - nodes := cluster.NodesByType(runtime.MachineTypeInit) - if len(nodes) != 1 { - return fmt.Errorf("expected 1 init node, got %d", len(nodes)) - } - - node = nodes[0] - case len(cluster.NodesByType(runtime.MachineTypeControlPlane)) > 0: - nodes := cluster.NodesByType(runtime.MachineTypeControlPlane) - - sort.Strings(nodes) - - node = nodes[0] - default: - return fmt.Errorf("no bootstrap node found") - } - - nodeCtx := client.WithNodes(ctx, node) - - servicesInfo, err := cli.ServiceInfo(nodeCtx, service) + servicesInfo, err := cli.ServiceInfo(nodesCtx, service) if err != nil { return err } - serviceOk := false + if len(servicesInfo) == 0 { + return ErrServiceNotFound + } acceptedStates := map[string]struct{}{} for _, state := range states { acceptedStates[state] = struct{}{} } + var multiErr *multierror.Error + for _, serviceInfo := range servicesInfo { + node := serviceInfo.Metadata.GetHostname() + if len(serviceInfo.Service.Events.Events) == 0 { - return fmt.Errorf("no events recorded yet for service %q", service) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: no events recorded yet for service %q", node, service)) + continue } lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1] if _, ok := acceptedStates[lastEvent.State]; !ok { - return fmt.Errorf("service %q not in expected state %q: current state [%s] %s", service, states, lastEvent.State, lastEvent.Msg) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service %q not in expected state %q: current state [%s] %s", node, service, states, lastEvent.State, lastEvent.Msg)) } - - serviceOk = true } - if !serviceOk { - return ErrServiceNotFound - } - - return nil + return multiErr.ErrorOrNil() } // ServiceHealthAssertion checks whether service reached some specified state. @@ -119,20 +105,32 @@ func ServiceHealthAssertion(ctx context.Context, cluster ClusterInfo, service st return fmt.Errorf("expected a response with %d node(s), got %d", count, len(servicesInfo)) } + var multiErr *multierror.Error + + // sort service info list so that errors returned are consistent + sort.Slice(servicesInfo, func(i, j int) bool { + return servicesInfo[i].Metadata.GetHostname() < servicesInfo[j].Metadata.GetHostname() + }) + for _, serviceInfo := range servicesInfo { + node := serviceInfo.Metadata.GetHostname() + if len(serviceInfo.Service.Events.Events) == 0 { - return fmt.Errorf("no events recorded yet for service %q", service) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: no events recorded yet for service %q", node, service)) + continue } lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1] if lastEvent.State != "Running" { - return fmt.Errorf("service %q not in expected state %q: current state [%s] %s", service, "Running", lastEvent.State, lastEvent.Msg) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service %q not in expected state %q: current state [%s] %s", node, service, "Running", lastEvent.State, lastEvent.Msg)) + continue } if !serviceInfo.Service.GetHealth().GetHealthy() { - return fmt.Errorf("service is not healthy: %s", service) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service is not healthy: %s", node, service)) + continue } } - return nil + return multiErr.ErrorOrNil() } diff --git a/pkg/client/events.go b/pkg/client/events.go index d3e0c0ac9..167fc817b 100644 --- a/pkg/client/events.go +++ b/pkg/client/events.go @@ -73,6 +73,10 @@ func (c *Client) EventsWatch(ctx context.Context, watchFunc func(<-chan Event), return fmt.Errorf("error fetching events: %s", err) } + if err = stream.CloseSend(); err != nil { + return err + } + defaultNode := RemotePeer(stream.Context()) var wg sync.WaitGroup diff --git a/pkg/client/resolver.go b/pkg/client/resolver.go index b8e1d8f53..03d484010 100644 --- a/pkg/client/resolver.go +++ b/pkg/client/resolver.go @@ -6,6 +6,7 @@ package client import ( "fmt" + "math/rand" "strings" "google.golang.org/grpc/resolver" @@ -28,7 +29,10 @@ func (b *talosListResolverBuilder) Build(target resolver.Target, cc resolver.Cli target: target, cc: cc, } - r.start() + + if err := r.start(); err != nil { + return nil, err + } return r, nil } @@ -43,7 +47,7 @@ type talosListResolver struct { cc resolver.ClientConn } -func (r *talosListResolver) start() { +func (r *talosListResolver) start() error { var addrs []resolver.Address // nolint: prealloc for _, a := range strings.Split(r.target.Endpoint, ",") { @@ -53,9 +57,29 @@ func (r *talosListResolver) start() { }) } - r.cc.UpdateState(resolver.State{ - Addresses: addrs, + // shuffle the list in case client does just one request + rand.Shuffle(len(addrs), func(i, j int) { + addrs[i], addrs[j] = addrs[j], addrs[i] }) + + serviceConfigJSON := `{ + "loadBalancingConfig": [{ + "round_robin": {} + }] + }` + + parsedServiceConfig := r.cc.ParseServiceConfig(serviceConfigJSON) + + if parsedServiceConfig.Err != nil { + return parsedServiceConfig.Err + } + + r.cc.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: parsedServiceConfig, + }) + + return nil } // ResolveNow implements resolver.Resolver.