diff --git a/cmd/talosctl/cmd/mgmt/cluster/create.go b/cmd/talosctl/cmd/mgmt/cluster/create.go index 8ae0cba80..bd279a4dd 100644 --- a/cmd/talosctl/cmd/mgmt/cluster/create.go +++ b/cmd/talosctl/cmd/mgmt/cluster/create.go @@ -202,7 +202,7 @@ func create(ctx context.Context) (err error) { })) } - defaultInternalLB, defaultExternalLB := provisioner.GetLoadBalancers(request.Network) + defaultInternalLB, _ := provisioner.GetLoadBalancers(request.Network) if defaultInternalLB == "" { // provisioner doesn't provide internal LB, so use first master node @@ -218,7 +218,10 @@ func create(ctx context.Context) (err error) { case forceInitNodeAsEndpoint: endpointList = []string{ips[0].String()} default: - endpointList = []string{defaultExternalLB} + // use control plane nodes as endpoints, client-side load-balancing + for i := 0; i < masters; i++ { + endpointList = append(endpointList, ips[i].String()) + } } genOptions = append(genOptions, generate.WithEndpointList(endpointList)) diff --git a/internal/integration/api/events.go b/internal/integration/api/events.go index c254e6aef..aa468a14b 100644 --- a/internal/integration/api/events.go +++ b/internal/integration/api/events.go @@ -9,6 +9,7 @@ package api import ( "context" "fmt" + "math/rand" "time" "github.com/talos-systems/talos/api/machine" @@ -22,6 +23,8 @@ type EventsSuite struct { ctx context.Context ctxCancel context.CancelFunc + + nodeCtx context.Context } // SuiteName ... @@ -32,7 +35,13 @@ func (suite *EventsSuite) SuiteName() string { // SetupTest ... func (suite *EventsSuite) SetupTest() { // make sure API calls have timeout - suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute) + suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 30*time.Second) + + nodes := suite.DiscoverNodes() + suite.Require().NotEmpty(nodes) + node := nodes[rand.Intn(len(nodes))] + + suite.nodeCtx = client.WithNodes(suite.ctx, node) } // TearDownTest ... @@ -44,10 +53,7 @@ func (suite *EventsSuite) TearDownTest() { func (suite *EventsSuite) TestServiceEvents() { const service = "timed" // any restartable service should work - ctx, ctxCancel := context.WithTimeout(suite.ctx, 30*time.Second) - defer ctxCancel() - - svcInfo, err := suite.Client.ServiceInfo(ctx, service) + svcInfo, err := suite.Client.ServiceInfo(suite.nodeCtx, service) suite.Require().NoError(err) if len(svcInfo) == 0 { // service is not registered (e.g. docker) @@ -73,8 +79,8 @@ func (suite *EventsSuite) TestServiceEvents() { } go func() { - suite.Assert().NoError(suite.Client.EventsWatch(ctx, func(ch <-chan client.Event) { - defer ctxCancel() + suite.Assert().NoError(suite.Client.EventsWatch(suite.nodeCtx, func(ch <-chan client.Event) { + defer suite.ctxCancel() for event := range ch { if msg, ok := event.Payload.(*machine.ServiceStateEvent); ok && msg.GetService() == service { @@ -91,23 +97,20 @@ func (suite *EventsSuite) TestServiceEvents() { // wait for event watcher to start time.Sleep(200 * time.Millisecond) - _, err = suite.Client.ServiceRestart(ctx, service) + _, err = suite.Client.ServiceRestart(suite.nodeCtx, service) suite.Assert().NoError(err) - <-ctx.Done() + <-suite.ctx.Done() suite.Require().NoError(checkExpectedActions()) } // TestEventsWatch verifies events watch API. func (suite *EventsSuite) TestEventsWatch() { - ctx, ctxCancel := context.WithTimeout(suite.ctx, 30*time.Second) - defer ctxCancel() - receiveEvents := func(opts ...client.EventsOptionFunc) []client.Event { result := []client.Event{} - watchCtx, watchCtxCancel := context.WithCancel(ctx) + watchCtx, watchCtxCancel := context.WithCancel(suite.nodeCtx) defer watchCtxCancel() suite.Assert().NoError(suite.Client.EventsWatch(watchCtx, func(ch <-chan client.Event) { diff --git a/internal/integration/api/logs.go b/internal/integration/api/logs.go index 55b634f16..f33a3d8cc 100644 --- a/internal/integration/api/logs.go +++ b/internal/integration/api/logs.go @@ -12,6 +12,7 @@ import ( "context" "io" "io/ioutil" + "math/rand" "time" "github.com/talos-systems/talos/api/common" @@ -26,6 +27,8 @@ type LogsSuite struct { ctx context.Context ctxCancel context.CancelFunc + + nodeCtx context.Context } // SuiteName ... @@ -37,6 +40,12 @@ func (suite *LogsSuite) SuiteName() string { func (suite *LogsSuite) SetupTest() { // make sure API calls have timeout suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute) + + nodes := suite.DiscoverNodes() + suite.Require().NotEmpty(nodes) + node := nodes[rand.Intn(len(nodes))] + + suite.nodeCtx = client.WithNodes(suite.ctx, node) } // TearDownTest ... @@ -46,7 +55,7 @@ func (suite *LogsSuite) TearDownTest() { // TestServicesHaveLogs verifies that each service has logs. func (suite *LogsSuite) TestServicesHaveLogs() { - servicesReply, err := suite.Client.ServiceList(suite.ctx) + servicesReply, err := suite.Client.ServiceList(suite.nodeCtx) suite.Require().NoError(err) suite.Require().Len(servicesReply.Messages, 1) @@ -55,7 +64,7 @@ func (suite *LogsSuite) TestServicesHaveLogs() { for _, svc := range servicesReply.Messages[0].Services { logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, svc.Id, @@ -84,13 +93,13 @@ func (suite *LogsSuite) TestTail() { // invoke machined enough times to generate // some logs for i := 0; i < 20; i++ { - _, err := suite.Client.Version(suite.ctx) + _, err := suite.Client.Version(suite.nodeCtx) suite.Require().NoError(err) } for _, tailLines := range []int32{0, 1, 10} { logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, "apid", @@ -122,7 +131,7 @@ func (suite *LogsSuite) TestTail() { // TestServiceNotFound verifies error if service name is not found. func (suite *LogsSuite) TestServiceNotFound() { logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, "nosuchservice", @@ -133,10 +142,10 @@ func (suite *LogsSuite) TestServiceNotFound() { suite.Require().NoError(logsStream.CloseSend()) - _, err = logsStream.Recv() - suite.Require().Error(err) + msg, err := logsStream.Recv() + suite.Require().NoError(err) - suite.Require().Regexp(`.+log "nosuchservice" was not registered$`, err.Error()) + suite.Require().Regexp(`.+log "nosuchservice" was not registered$`, msg.Metadata.Error) } // TestStreaming verifies that logs are streamed in real-time. @@ -159,13 +168,13 @@ func (suite *LogsSuite) testStreaming(tailLines int32) { // invoke osd enough times to generate // some logs for i := int32(0); i < tailLines; i++ { - _, err := suite.Client.Stats(suite.ctx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) + _, err := suite.Client.Stats(suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) suite.Require().NoError(err) } } logsStream, err := suite.Client.Logs( - suite.ctx, + suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD, "osd", @@ -221,7 +230,7 @@ DrainLoop: } // invoke osd API - _, err = suite.Client.Stats(suite.ctx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) + _, err = suite.Client.Stats(suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) suite.Require().NoError(err) // there should be a line in the logs diff --git a/internal/integration/provision/upgrade.go b/internal/integration/provision/upgrade.go index 1b288482c..241e4b4fb 100644 --- a/internal/integration/provision/upgrade.go +++ b/internal/integration/provision/upgrade.go @@ -245,7 +245,7 @@ func (suite *UpgradeSuite) setupCluster() { StateDirectory: suite.stateDir, } - defaultInternalLB, defaultExternalLB := suite.provisioner.GetLoadBalancers(request.Network) + defaultInternalLB, _ := suite.provisioner.GetLoadBalancers(request.Network) genOptions := suite.provisioner.GenOptions(request.Network) @@ -256,6 +256,11 @@ func (suite *UpgradeSuite) setupCluster() { genOptions = append(genOptions, generate.WithRegistryMirror(parts[0], parts[1])) } + masterEndpoints := make([]string, suite.spec.MasterNodes) + for i := range masterEndpoints { + masterEndpoints[i] = ips[i].String() + } + suite.configBundle, err = config.NewConfigBundle(config.WithInputOptions( &config.InputOptions{ ClusterName: clusterName, @@ -263,7 +268,7 @@ func (suite *UpgradeSuite) setupCluster() { KubeVersion: "", // keep empty so that default version is used per Talos version GenOptions: append( genOptions, - generate.WithEndpointList([]string{defaultExternalLB}), + generate.WithEndpointList(masterEndpoints), generate.WithInstallImage(suite.spec.SourceInstallerImage), ), })) diff --git a/internal/pkg/cluster/check/default.go b/internal/pkg/cluster/check/default.go index 273c52c9f..b97949cdb 100644 --- a/internal/pkg/cluster/check/default.go +++ b/internal/pkg/cluster/check/default.go @@ -59,7 +59,7 @@ func DefaultClusterChecks() []ClusterCheck { func(cluster ClusterInfo) conditions.Condition { return conditions.PollingCondition("all control plane components to be ready", func(ctx context.Context) error { return K8sFullControlPlaneAssertion(ctx, cluster) - }, 2*time.Minute, 5*time.Second) + }, 5*time.Minute, 5*time.Second) }, // wait for kube-proxy to report ready func(cluster ClusterInfo) conditions.Condition { diff --git a/internal/pkg/cluster/check/reporter.go b/internal/pkg/cluster/check/reporter.go index 57c98ada4..13878ccf0 100644 --- a/internal/pkg/cluster/check/reporter.go +++ b/internal/pkg/cluster/check/reporter.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "os" + "strings" "github.com/talos-systems/talos/internal/pkg/conditions" ) @@ -21,7 +22,7 @@ func (wr *writerReporter) Update(condition conditions.Condition) { line := fmt.Sprintf("waiting for %s", condition) if line != wr.lastLine { - fmt.Fprintln(wr.w, line) + fmt.Fprintln(wr.w, strings.TrimSpace(line)) wr.lastLine = line } } diff --git a/internal/pkg/cluster/check/service.go b/internal/pkg/cluster/check/service.go index b9c8de8e2..eb47742db 100644 --- a/internal/pkg/cluster/check/service.go +++ b/internal/pkg/cluster/check/service.go @@ -10,6 +10,8 @@ import ( "fmt" "sort" + "github.com/hashicorp/go-multierror" + "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/pkg/client" ) @@ -26,58 +28,42 @@ func ServiceStateAssertion(ctx context.Context, cluster ClusterInfo, service str return err } - var node string + // by default, we check all control plane nodes. if some nodes don't have that service running, + // it won't be returned in the response + nodes := append(cluster.NodesByType(runtime.MachineTypeInit), cluster.NodesByType(runtime.MachineTypeControlPlane)...) + nodesCtx := client.WithNodes(ctx, nodes...) - switch { - case len(cluster.NodesByType(runtime.MachineTypeInit)) > 0: - nodes := cluster.NodesByType(runtime.MachineTypeInit) - if len(nodes) != 1 { - return fmt.Errorf("expected 1 init node, got %d", len(nodes)) - } - - node = nodes[0] - case len(cluster.NodesByType(runtime.MachineTypeControlPlane)) > 0: - nodes := cluster.NodesByType(runtime.MachineTypeControlPlane) - - sort.Strings(nodes) - - node = nodes[0] - default: - return fmt.Errorf("no bootstrap node found") - } - - nodeCtx := client.WithNodes(ctx, node) - - servicesInfo, err := cli.ServiceInfo(nodeCtx, service) + servicesInfo, err := cli.ServiceInfo(nodesCtx, service) if err != nil { return err } - serviceOk := false + if len(servicesInfo) == 0 { + return ErrServiceNotFound + } acceptedStates := map[string]struct{}{} for _, state := range states { acceptedStates[state] = struct{}{} } + var multiErr *multierror.Error + for _, serviceInfo := range servicesInfo { + node := serviceInfo.Metadata.GetHostname() + if len(serviceInfo.Service.Events.Events) == 0 { - return fmt.Errorf("no events recorded yet for service %q", service) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: no events recorded yet for service %q", node, service)) + continue } lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1] if _, ok := acceptedStates[lastEvent.State]; !ok { - return fmt.Errorf("service %q not in expected state %q: current state [%s] %s", service, states, lastEvent.State, lastEvent.Msg) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service %q not in expected state %q: current state [%s] %s", node, service, states, lastEvent.State, lastEvent.Msg)) } - - serviceOk = true } - if !serviceOk { - return ErrServiceNotFound - } - - return nil + return multiErr.ErrorOrNil() } // ServiceHealthAssertion checks whether service reached some specified state. @@ -119,20 +105,32 @@ func ServiceHealthAssertion(ctx context.Context, cluster ClusterInfo, service st return fmt.Errorf("expected a response with %d node(s), got %d", count, len(servicesInfo)) } + var multiErr *multierror.Error + + // sort service info list so that errors returned are consistent + sort.Slice(servicesInfo, func(i, j int) bool { + return servicesInfo[i].Metadata.GetHostname() < servicesInfo[j].Metadata.GetHostname() + }) + for _, serviceInfo := range servicesInfo { + node := serviceInfo.Metadata.GetHostname() + if len(serviceInfo.Service.Events.Events) == 0 { - return fmt.Errorf("no events recorded yet for service %q", service) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: no events recorded yet for service %q", node, service)) + continue } lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1] if lastEvent.State != "Running" { - return fmt.Errorf("service %q not in expected state %q: current state [%s] %s", service, "Running", lastEvent.State, lastEvent.Msg) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service %q not in expected state %q: current state [%s] %s", node, service, "Running", lastEvent.State, lastEvent.Msg)) + continue } if !serviceInfo.Service.GetHealth().GetHealthy() { - return fmt.Errorf("service is not healthy: %s", service) + multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service is not healthy: %s", node, service)) + continue } } - return nil + return multiErr.ErrorOrNil() } diff --git a/pkg/client/events.go b/pkg/client/events.go index d3e0c0ac9..167fc817b 100644 --- a/pkg/client/events.go +++ b/pkg/client/events.go @@ -73,6 +73,10 @@ func (c *Client) EventsWatch(ctx context.Context, watchFunc func(<-chan Event), return fmt.Errorf("error fetching events: %s", err) } + if err = stream.CloseSend(); err != nil { + return err + } + defaultNode := RemotePeer(stream.Context()) var wg sync.WaitGroup diff --git a/pkg/client/resolver.go b/pkg/client/resolver.go index b8e1d8f53..03d484010 100644 --- a/pkg/client/resolver.go +++ b/pkg/client/resolver.go @@ -6,6 +6,7 @@ package client import ( "fmt" + "math/rand" "strings" "google.golang.org/grpc/resolver" @@ -28,7 +29,10 @@ func (b *talosListResolverBuilder) Build(target resolver.Target, cc resolver.Cli target: target, cc: cc, } - r.start() + + if err := r.start(); err != nil { + return nil, err + } return r, nil } @@ -43,7 +47,7 @@ type talosListResolver struct { cc resolver.ClientConn } -func (r *talosListResolver) start() { +func (r *talosListResolver) start() error { var addrs []resolver.Address // nolint: prealloc for _, a := range strings.Split(r.target.Endpoint, ",") { @@ -53,9 +57,29 @@ func (r *talosListResolver) start() { }) } - r.cc.UpdateState(resolver.State{ - Addresses: addrs, + // shuffle the list in case client does just one request + rand.Shuffle(len(addrs), func(i, j int) { + addrs[i], addrs[j] = addrs[j], addrs[i] }) + + serviceConfigJSON := `{ + "loadBalancingConfig": [{ + "round_robin": {} + }] + }` + + parsedServiceConfig := r.cc.ParseServiceConfig(serviceConfigJSON) + + if parsedServiceConfig.Err != nil { + return parsedServiceConfig.Err + } + + r.cc.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: parsedServiceConfig, + }) + + return nil } // ResolveNow implements resolver.Resolver.