feat: add round-robin LB policy to Talos client by default

Handling of multiple endpoints has already been implemented in #2094.

This PR enables round-robin policy so that grpc picks up new endpoint
for each call (and not send each request to the first control plane
node).

Endpoint list is randomized to handle cases when only one request is
going to be sent, so that it doesn't go always to the first node in the
list.

gprc handles dead/unresponsive nodes automatically for us.

`talosctl cluster create` and provision tests switched to use
client-side load balancer for Talos API.

On the additional improvements we got:

* `talosctl` now reports correct node IP when using commands without
`-n`, not the loadbalancer IP (if using multiple endpoints of course)

* loadbalancer can't provide reliable handling of errors when upstream
server is unresponsive or there're no upstreams available, grpc returns
much more helpful errors

Fixes #1641

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2020-07-08 21:33:19 +03:00 committed by talos-bot
parent 4cc074cdba
commit 5ecddf2866
9 changed files with 117 additions and 70 deletions

View File

@ -202,7 +202,7 @@ func create(ctx context.Context) (err error) {
})) }))
} }
defaultInternalLB, defaultExternalLB := provisioner.GetLoadBalancers(request.Network) defaultInternalLB, _ := provisioner.GetLoadBalancers(request.Network)
if defaultInternalLB == "" { if defaultInternalLB == "" {
// provisioner doesn't provide internal LB, so use first master node // provisioner doesn't provide internal LB, so use first master node
@ -218,7 +218,10 @@ func create(ctx context.Context) (err error) {
case forceInitNodeAsEndpoint: case forceInitNodeAsEndpoint:
endpointList = []string{ips[0].String()} endpointList = []string{ips[0].String()}
default: default:
endpointList = []string{defaultExternalLB} // use control plane nodes as endpoints, client-side load-balancing
for i := 0; i < masters; i++ {
endpointList = append(endpointList, ips[i].String())
}
} }
genOptions = append(genOptions, generate.WithEndpointList(endpointList)) genOptions = append(genOptions, generate.WithEndpointList(endpointList))

View File

@ -9,6 +9,7 @@ package api
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"time" "time"
"github.com/talos-systems/talos/api/machine" "github.com/talos-systems/talos/api/machine"
@ -22,6 +23,8 @@ type EventsSuite struct {
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
nodeCtx context.Context
} }
// SuiteName ... // SuiteName ...
@ -32,7 +35,13 @@ func (suite *EventsSuite) SuiteName() string {
// SetupTest ... // SetupTest ...
func (suite *EventsSuite) SetupTest() { func (suite *EventsSuite) SetupTest() {
// make sure API calls have timeout // make sure API calls have timeout
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute) suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 30*time.Second)
nodes := suite.DiscoverNodes()
suite.Require().NotEmpty(nodes)
node := nodes[rand.Intn(len(nodes))]
suite.nodeCtx = client.WithNodes(suite.ctx, node)
} }
// TearDownTest ... // TearDownTest ...
@ -44,10 +53,7 @@ func (suite *EventsSuite) TearDownTest() {
func (suite *EventsSuite) TestServiceEvents() { func (suite *EventsSuite) TestServiceEvents() {
const service = "timed" // any restartable service should work const service = "timed" // any restartable service should work
ctx, ctxCancel := context.WithTimeout(suite.ctx, 30*time.Second) svcInfo, err := suite.Client.ServiceInfo(suite.nodeCtx, service)
defer ctxCancel()
svcInfo, err := suite.Client.ServiceInfo(ctx, service)
suite.Require().NoError(err) suite.Require().NoError(err)
if len(svcInfo) == 0 { // service is not registered (e.g. docker) if len(svcInfo) == 0 { // service is not registered (e.g. docker)
@ -73,8 +79,8 @@ func (suite *EventsSuite) TestServiceEvents() {
} }
go func() { go func() {
suite.Assert().NoError(suite.Client.EventsWatch(ctx, func(ch <-chan client.Event) { suite.Assert().NoError(suite.Client.EventsWatch(suite.nodeCtx, func(ch <-chan client.Event) {
defer ctxCancel() defer suite.ctxCancel()
for event := range ch { for event := range ch {
if msg, ok := event.Payload.(*machine.ServiceStateEvent); ok && msg.GetService() == service { if msg, ok := event.Payload.(*machine.ServiceStateEvent); ok && msg.GetService() == service {
@ -91,23 +97,20 @@ func (suite *EventsSuite) TestServiceEvents() {
// wait for event watcher to start // wait for event watcher to start
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
_, err = suite.Client.ServiceRestart(ctx, service) _, err = suite.Client.ServiceRestart(suite.nodeCtx, service)
suite.Assert().NoError(err) suite.Assert().NoError(err)
<-ctx.Done() <-suite.ctx.Done()
suite.Require().NoError(checkExpectedActions()) suite.Require().NoError(checkExpectedActions())
} }
// TestEventsWatch verifies events watch API. // TestEventsWatch verifies events watch API.
func (suite *EventsSuite) TestEventsWatch() { func (suite *EventsSuite) TestEventsWatch() {
ctx, ctxCancel := context.WithTimeout(suite.ctx, 30*time.Second)
defer ctxCancel()
receiveEvents := func(opts ...client.EventsOptionFunc) []client.Event { receiveEvents := func(opts ...client.EventsOptionFunc) []client.Event {
result := []client.Event{} result := []client.Event{}
watchCtx, watchCtxCancel := context.WithCancel(ctx) watchCtx, watchCtxCancel := context.WithCancel(suite.nodeCtx)
defer watchCtxCancel() defer watchCtxCancel()
suite.Assert().NoError(suite.Client.EventsWatch(watchCtx, func(ch <-chan client.Event) { suite.Assert().NoError(suite.Client.EventsWatch(watchCtx, func(ch <-chan client.Event) {

View File

@ -12,6 +12,7 @@ import (
"context" "context"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"time" "time"
"github.com/talos-systems/talos/api/common" "github.com/talos-systems/talos/api/common"
@ -26,6 +27,8 @@ type LogsSuite struct {
ctx context.Context ctx context.Context
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
nodeCtx context.Context
} }
// SuiteName ... // SuiteName ...
@ -37,6 +40,12 @@ func (suite *LogsSuite) SuiteName() string {
func (suite *LogsSuite) SetupTest() { func (suite *LogsSuite) SetupTest() {
// make sure API calls have timeout // make sure API calls have timeout
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute) suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 2*time.Minute)
nodes := suite.DiscoverNodes()
suite.Require().NotEmpty(nodes)
node := nodes[rand.Intn(len(nodes))]
suite.nodeCtx = client.WithNodes(suite.ctx, node)
} }
// TearDownTest ... // TearDownTest ...
@ -46,7 +55,7 @@ func (suite *LogsSuite) TearDownTest() {
// TestServicesHaveLogs verifies that each service has logs. // TestServicesHaveLogs verifies that each service has logs.
func (suite *LogsSuite) TestServicesHaveLogs() { func (suite *LogsSuite) TestServicesHaveLogs() {
servicesReply, err := suite.Client.ServiceList(suite.ctx) servicesReply, err := suite.Client.ServiceList(suite.nodeCtx)
suite.Require().NoError(err) suite.Require().NoError(err)
suite.Require().Len(servicesReply.Messages, 1) suite.Require().Len(servicesReply.Messages, 1)
@ -55,7 +64,7 @@ func (suite *LogsSuite) TestServicesHaveLogs() {
for _, svc := range servicesReply.Messages[0].Services { for _, svc := range servicesReply.Messages[0].Services {
logsStream, err := suite.Client.Logs( logsStream, err := suite.Client.Logs(
suite.ctx, suite.nodeCtx,
constants.SystemContainerdNamespace, constants.SystemContainerdNamespace,
common.ContainerDriver_CONTAINERD, common.ContainerDriver_CONTAINERD,
svc.Id, svc.Id,
@ -84,13 +93,13 @@ func (suite *LogsSuite) TestTail() {
// invoke machined enough times to generate // invoke machined enough times to generate
// some logs // some logs
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
_, err := suite.Client.Version(suite.ctx) _, err := suite.Client.Version(suite.nodeCtx)
suite.Require().NoError(err) suite.Require().NoError(err)
} }
for _, tailLines := range []int32{0, 1, 10} { for _, tailLines := range []int32{0, 1, 10} {
logsStream, err := suite.Client.Logs( logsStream, err := suite.Client.Logs(
suite.ctx, suite.nodeCtx,
constants.SystemContainerdNamespace, constants.SystemContainerdNamespace,
common.ContainerDriver_CONTAINERD, common.ContainerDriver_CONTAINERD,
"apid", "apid",
@ -122,7 +131,7 @@ func (suite *LogsSuite) TestTail() {
// TestServiceNotFound verifies error if service name is not found. // TestServiceNotFound verifies error if service name is not found.
func (suite *LogsSuite) TestServiceNotFound() { func (suite *LogsSuite) TestServiceNotFound() {
logsStream, err := suite.Client.Logs( logsStream, err := suite.Client.Logs(
suite.ctx, suite.nodeCtx,
constants.SystemContainerdNamespace, constants.SystemContainerdNamespace,
common.ContainerDriver_CONTAINERD, common.ContainerDriver_CONTAINERD,
"nosuchservice", "nosuchservice",
@ -133,10 +142,10 @@ func (suite *LogsSuite) TestServiceNotFound() {
suite.Require().NoError(logsStream.CloseSend()) suite.Require().NoError(logsStream.CloseSend())
_, err = logsStream.Recv() msg, err := logsStream.Recv()
suite.Require().Error(err) suite.Require().NoError(err)
suite.Require().Regexp(`.+log "nosuchservice" was not registered$`, err.Error()) suite.Require().Regexp(`.+log "nosuchservice" was not registered$`, msg.Metadata.Error)
} }
// TestStreaming verifies that logs are streamed in real-time. // TestStreaming verifies that logs are streamed in real-time.
@ -159,13 +168,13 @@ func (suite *LogsSuite) testStreaming(tailLines int32) {
// invoke osd enough times to generate // invoke osd enough times to generate
// some logs // some logs
for i := int32(0); i < tailLines; i++ { for i := int32(0); i < tailLines; i++ {
_, err := suite.Client.Stats(suite.ctx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) _, err := suite.Client.Stats(suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD)
suite.Require().NoError(err) suite.Require().NoError(err)
} }
} }
logsStream, err := suite.Client.Logs( logsStream, err := suite.Client.Logs(
suite.ctx, suite.nodeCtx,
constants.SystemContainerdNamespace, constants.SystemContainerdNamespace,
common.ContainerDriver_CONTAINERD, common.ContainerDriver_CONTAINERD,
"osd", "osd",
@ -221,7 +230,7 @@ DrainLoop:
} }
// invoke osd API // invoke osd API
_, err = suite.Client.Stats(suite.ctx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD) _, err = suite.Client.Stats(suite.nodeCtx, constants.SystemContainerdNamespace, common.ContainerDriver_CONTAINERD)
suite.Require().NoError(err) suite.Require().NoError(err)
// there should be a line in the logs // there should be a line in the logs

View File

@ -245,7 +245,7 @@ func (suite *UpgradeSuite) setupCluster() {
StateDirectory: suite.stateDir, StateDirectory: suite.stateDir,
} }
defaultInternalLB, defaultExternalLB := suite.provisioner.GetLoadBalancers(request.Network) defaultInternalLB, _ := suite.provisioner.GetLoadBalancers(request.Network)
genOptions := suite.provisioner.GenOptions(request.Network) genOptions := suite.provisioner.GenOptions(request.Network)
@ -256,6 +256,11 @@ func (suite *UpgradeSuite) setupCluster() {
genOptions = append(genOptions, generate.WithRegistryMirror(parts[0], parts[1])) genOptions = append(genOptions, generate.WithRegistryMirror(parts[0], parts[1]))
} }
masterEndpoints := make([]string, suite.spec.MasterNodes)
for i := range masterEndpoints {
masterEndpoints[i] = ips[i].String()
}
suite.configBundle, err = config.NewConfigBundle(config.WithInputOptions( suite.configBundle, err = config.NewConfigBundle(config.WithInputOptions(
&config.InputOptions{ &config.InputOptions{
ClusterName: clusterName, ClusterName: clusterName,
@ -263,7 +268,7 @@ func (suite *UpgradeSuite) setupCluster() {
KubeVersion: "", // keep empty so that default version is used per Talos version KubeVersion: "", // keep empty so that default version is used per Talos version
GenOptions: append( GenOptions: append(
genOptions, genOptions,
generate.WithEndpointList([]string{defaultExternalLB}), generate.WithEndpointList(masterEndpoints),
generate.WithInstallImage(suite.spec.SourceInstallerImage), generate.WithInstallImage(suite.spec.SourceInstallerImage),
), ),
})) }))

View File

@ -59,7 +59,7 @@ func DefaultClusterChecks() []ClusterCheck {
func(cluster ClusterInfo) conditions.Condition { func(cluster ClusterInfo) conditions.Condition {
return conditions.PollingCondition("all control plane components to be ready", func(ctx context.Context) error { return conditions.PollingCondition("all control plane components to be ready", func(ctx context.Context) error {
return K8sFullControlPlaneAssertion(ctx, cluster) return K8sFullControlPlaneAssertion(ctx, cluster)
}, 2*time.Minute, 5*time.Second) }, 5*time.Minute, 5*time.Second)
}, },
// wait for kube-proxy to report ready // wait for kube-proxy to report ready
func(cluster ClusterInfo) conditions.Condition { func(cluster ClusterInfo) conditions.Condition {

View File

@ -8,6 +8,7 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"strings"
"github.com/talos-systems/talos/internal/pkg/conditions" "github.com/talos-systems/talos/internal/pkg/conditions"
) )
@ -21,7 +22,7 @@ func (wr *writerReporter) Update(condition conditions.Condition) {
line := fmt.Sprintf("waiting for %s", condition) line := fmt.Sprintf("waiting for %s", condition)
if line != wr.lastLine { if line != wr.lastLine {
fmt.Fprintln(wr.w, line) fmt.Fprintln(wr.w, strings.TrimSpace(line))
wr.lastLine = line wr.lastLine = line
} }
} }

View File

@ -10,6 +10,8 @@ import (
"fmt" "fmt"
"sort" "sort"
"github.com/hashicorp/go-multierror"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
"github.com/talos-systems/talos/pkg/client" "github.com/talos-systems/talos/pkg/client"
) )
@ -26,58 +28,42 @@ func ServiceStateAssertion(ctx context.Context, cluster ClusterInfo, service str
return err return err
} }
var node string // by default, we check all control plane nodes. if some nodes don't have that service running,
// it won't be returned in the response
nodes := append(cluster.NodesByType(runtime.MachineTypeInit), cluster.NodesByType(runtime.MachineTypeControlPlane)...)
nodesCtx := client.WithNodes(ctx, nodes...)
switch { servicesInfo, err := cli.ServiceInfo(nodesCtx, service)
case len(cluster.NodesByType(runtime.MachineTypeInit)) > 0:
nodes := cluster.NodesByType(runtime.MachineTypeInit)
if len(nodes) != 1 {
return fmt.Errorf("expected 1 init node, got %d", len(nodes))
}
node = nodes[0]
case len(cluster.NodesByType(runtime.MachineTypeControlPlane)) > 0:
nodes := cluster.NodesByType(runtime.MachineTypeControlPlane)
sort.Strings(nodes)
node = nodes[0]
default:
return fmt.Errorf("no bootstrap node found")
}
nodeCtx := client.WithNodes(ctx, node)
servicesInfo, err := cli.ServiceInfo(nodeCtx, service)
if err != nil { if err != nil {
return err return err
} }
serviceOk := false if len(servicesInfo) == 0 {
return ErrServiceNotFound
}
acceptedStates := map[string]struct{}{} acceptedStates := map[string]struct{}{}
for _, state := range states { for _, state := range states {
acceptedStates[state] = struct{}{} acceptedStates[state] = struct{}{}
} }
var multiErr *multierror.Error
for _, serviceInfo := range servicesInfo { for _, serviceInfo := range servicesInfo {
node := serviceInfo.Metadata.GetHostname()
if len(serviceInfo.Service.Events.Events) == 0 { if len(serviceInfo.Service.Events.Events) == 0 {
return fmt.Errorf("no events recorded yet for service %q", service) multiErr = multierror.Append(multiErr, fmt.Errorf("%s: no events recorded yet for service %q", node, service))
continue
} }
lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1] lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1]
if _, ok := acceptedStates[lastEvent.State]; !ok { if _, ok := acceptedStates[lastEvent.State]; !ok {
return fmt.Errorf("service %q not in expected state %q: current state [%s] %s", service, states, lastEvent.State, lastEvent.Msg) multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service %q not in expected state %q: current state [%s] %s", node, service, states, lastEvent.State, lastEvent.Msg))
}
} }
serviceOk = true return multiErr.ErrorOrNil()
}
if !serviceOk {
return ErrServiceNotFound
}
return nil
} }
// ServiceHealthAssertion checks whether service reached some specified state. // ServiceHealthAssertion checks whether service reached some specified state.
@ -119,20 +105,32 @@ func ServiceHealthAssertion(ctx context.Context, cluster ClusterInfo, service st
return fmt.Errorf("expected a response with %d node(s), got %d", count, len(servicesInfo)) return fmt.Errorf("expected a response with %d node(s), got %d", count, len(servicesInfo))
} }
var multiErr *multierror.Error
// sort service info list so that errors returned are consistent
sort.Slice(servicesInfo, func(i, j int) bool {
return servicesInfo[i].Metadata.GetHostname() < servicesInfo[j].Metadata.GetHostname()
})
for _, serviceInfo := range servicesInfo { for _, serviceInfo := range servicesInfo {
node := serviceInfo.Metadata.GetHostname()
if len(serviceInfo.Service.Events.Events) == 0 { if len(serviceInfo.Service.Events.Events) == 0 {
return fmt.Errorf("no events recorded yet for service %q", service) multiErr = multierror.Append(multiErr, fmt.Errorf("%s: no events recorded yet for service %q", node, service))
continue
} }
lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1] lastEvent := serviceInfo.Service.Events.Events[len(serviceInfo.Service.Events.Events)-1]
if lastEvent.State != "Running" { if lastEvent.State != "Running" {
return fmt.Errorf("service %q not in expected state %q: current state [%s] %s", service, "Running", lastEvent.State, lastEvent.Msg) multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service %q not in expected state %q: current state [%s] %s", node, service, "Running", lastEvent.State, lastEvent.Msg))
continue
} }
if !serviceInfo.Service.GetHealth().GetHealthy() { if !serviceInfo.Service.GetHealth().GetHealthy() {
return fmt.Errorf("service is not healthy: %s", service) multiErr = multierror.Append(multiErr, fmt.Errorf("%s: service is not healthy: %s", node, service))
continue
} }
} }
return nil return multiErr.ErrorOrNil()
} }

View File

@ -73,6 +73,10 @@ func (c *Client) EventsWatch(ctx context.Context, watchFunc func(<-chan Event),
return fmt.Errorf("error fetching events: %s", err) return fmt.Errorf("error fetching events: %s", err)
} }
if err = stream.CloseSend(); err != nil {
return err
}
defaultNode := RemotePeer(stream.Context()) defaultNode := RemotePeer(stream.Context())
var wg sync.WaitGroup var wg sync.WaitGroup

View File

@ -6,6 +6,7 @@ package client
import ( import (
"fmt" "fmt"
"math/rand"
"strings" "strings"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
@ -28,7 +29,10 @@ func (b *talosListResolverBuilder) Build(target resolver.Target, cc resolver.Cli
target: target, target: target,
cc: cc, cc: cc,
} }
r.start()
if err := r.start(); err != nil {
return nil, err
}
return r, nil return r, nil
} }
@ -43,7 +47,7 @@ type talosListResolver struct {
cc resolver.ClientConn cc resolver.ClientConn
} }
func (r *talosListResolver) start() { func (r *talosListResolver) start() error {
var addrs []resolver.Address // nolint: prealloc var addrs []resolver.Address // nolint: prealloc
for _, a := range strings.Split(r.target.Endpoint, ",") { for _, a := range strings.Split(r.target.Endpoint, ",") {
@ -53,9 +57,29 @@ func (r *talosListResolver) start() {
}) })
} }
// shuffle the list in case client does just one request
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
serviceConfigJSON := `{
"loadBalancingConfig": [{
"round_robin": {}
}]
}`
parsedServiceConfig := r.cc.ParseServiceConfig(serviceConfigJSON)
if parsedServiceConfig.Err != nil {
return parsedServiceConfig.Err
}
r.cc.UpdateState(resolver.State{ r.cc.UpdateState(resolver.State{
Addresses: addrs, Addresses: addrs,
ServiceConfig: parsedServiceConfig,
}) })
return nil
} }
// ResolveNow implements resolver.Resolver. // ResolveNow implements resolver.Resolver.