diff --git a/cmd/installer/pkg/install/manifest.go b/cmd/installer/pkg/install/manifest.go index e2729b21d..4c130ee80 100644 --- a/cmd/installer/pkg/install/manifest.go +++ b/cmd/installer/pkg/install/manifest.go @@ -13,6 +13,8 @@ import ( "strings" "time" + "github.com/talos-systems/go-retry/retry" + "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/pkg/blockdevice" "github.com/talos-systems/talos/pkg/blockdevice/filesystem/vfat" @@ -21,7 +23,6 @@ import ( "github.com/talos-systems/talos/pkg/blockdevice/table/gpt/partition" "github.com/talos-systems/talos/pkg/blockdevice/util" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) // Manifest represents the instructions for preparing all block devices diff --git a/cmd/talosctl/cmd/mgmt/loadbalancer_launch.go b/cmd/talosctl/cmd/mgmt/loadbalancer_launch.go index e53133144..4c189f8a8 100644 --- a/cmd/talosctl/cmd/mgmt/loadbalancer_launch.go +++ b/cmd/talosctl/cmd/mgmt/loadbalancer_launch.go @@ -9,7 +9,7 @@ import ( "github.com/spf13/cobra" - "github.com/talos-systems/talos/internal/pkg/loadbalancer" + "github.com/talos-systems/go-loadbalancer/loadbalancer" ) var loadbalancerLaunchCmdFlags struct { diff --git a/go.mod b/go.mod index 756ff4d81..443252ebc 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,9 @@ require ( github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2 github.com/talos-systems/bootkube-plugin v0.0.0-20200729203641-12d463a3e54e github.com/talos-systems/crypto v0.2.0 + github.com/talos-systems/go-loadbalancer v0.1.0 github.com/talos-systems/go-procfs v0.0.0-20200219015357-57c7311fdd45 + github.com/talos-systems/go-retry v0.1.0 github.com/talos-systems/go-smbios v0.0.0-20200219201045-94b8c4e489ee github.com/talos-systems/grpc-proxy v0.2.0 github.com/talos-systems/net v0.1.0 @@ -77,7 +79,6 @@ require ( gopkg.in/freddierice/go-losetup.v1 v1.0.0-20170407175016-fc9adea44124 gopkg.in/fsnotify.v1 v1.4.7 gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 - inet.af/tcpproxy v0.0.0-20200125044825-b6bb9b5b8252 k8s.io/api v0.19.0 k8s.io/apimachinery v0.19.0 k8s.io/apiserver v0.19.0 diff --git a/go.sum b/go.sum index d3151ca54..4ba6cb1fb 100644 --- a/go.sum +++ b/go.sum @@ -685,8 +685,12 @@ github.com/talos-systems/bootkube-plugin v0.0.0-20200729203641-12d463a3e54e h1:Z github.com/talos-systems/bootkube-plugin v0.0.0-20200729203641-12d463a3e54e/go.mod h1:AbdJAgHK5rJNDPUN3msPTfQJSR9b4DKb5xNN07uG8/Y= github.com/talos-systems/crypto v0.2.0 h1:UwT8uhJ0eDlklY0vYwo1+LGoFgiqkPqjQnae6j8UNYE= github.com/talos-systems/crypto v0.2.0/go.mod h1:KwqG+jANKU1FNQIapmioHQ5fkovY1DJkAqMenjYBGh0= +github.com/talos-systems/go-loadbalancer v0.1.0 h1:MQFONvSjoleU8RrKq1O1Z8CyTCJGd4SLqdAHDlR6o9s= +github.com/talos-systems/go-loadbalancer v0.1.0/go.mod h1:D5Qjfz+29WVjONWECZvOkmaLsBb3f5YeWME0u/5HmIc= github.com/talos-systems/go-procfs v0.0.0-20200219015357-57c7311fdd45 h1:FND/LgzFHTBdJBOeZVzdO6B47kxQZvSIzb9AMIXYotg= github.com/talos-systems/go-procfs v0.0.0-20200219015357-57c7311fdd45/go.mod h1:ATyUGFQIW8OnbnmvqefZWVPgL9g+CAmXHfkgny21xX8= +github.com/talos-systems/go-retry v0.1.0 h1:O+OeZR54CQ1+ch99p/81Pqi5GqJH6LIu1MTN/N0vd78= +github.com/talos-systems/go-retry v0.1.0/go.mod h1:HiXQqyVStZ35uSY/MTLWVvQVmC3lIW2MS5VdDaMtoKM= github.com/talos-systems/go-smbios v0.0.0-20200219201045-94b8c4e489ee h1:9i0ZFsjZ0wY8UUn/tk2MQshLBC0PNFJe3+84AUqzzyw= github.com/talos-systems/go-smbios v0.0.0-20200219201045-94b8c4e489ee/go.mod h1:HxhrzAoTZ7ed5Z5VvtCvnCIrOxyXDS7V2B5hCetAMW8= github.com/talos-systems/grpc-proxy v0.2.0 h1:DN75bLfaW4xfhq0r0mwFRnfGhSB+HPhK1LNzuMEs9Pw= diff --git a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go index d0499a7f0..4da45f4bc 100644 --- a/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go +++ b/internal/app/machined/pkg/runtime/v1alpha1/v1alpha1_sequencer_tasks.go @@ -28,6 +28,8 @@ import ( "golang.org/x/sys/unix" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "github.com/talos-systems/go-retry/retry" + installer "github.com/talos-systems/talos/cmd/installer/pkg/install" "github.com/talos-systems/talos/internal/app/machined/internal/install" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" @@ -54,7 +56,6 @@ import ( "github.com/talos-systems/talos/pkg/machinery/config/configloader" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" "github.com/talos-systems/talos/pkg/sysctl" "github.com/talos-systems/talos/pkg/version" ) diff --git a/internal/app/machined/pkg/system/service_runner_test.go b/internal/app/machined/pkg/system/service_runner_test.go index 123cb3ffb..7a643467f 100644 --- a/internal/app/machined/pkg/system/service_runner_test.go +++ b/internal/app/machined/pkg/system/service_runner_test.go @@ -10,11 +10,11 @@ import ( "time" "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/talos/internal/app/machined/pkg/system" "github.com/talos-systems/talos/internal/app/machined/pkg/system/events" "github.com/talos-systems/talos/pkg/conditions" - "github.com/talos-systems/talos/pkg/retry" ) type ServiceRunnerSuite struct { diff --git a/internal/app/machined/pkg/system/services/apid.go b/internal/app/machined/pkg/system/services/apid.go index 708cca9e4..dfb022653 100644 --- a/internal/app/machined/pkg/system/services/apid.go +++ b/internal/app/machined/pkg/system/services/apid.go @@ -19,6 +19,8 @@ import ( "github.com/containerd/containerd/oci" specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/talos-systems/go-retry/retry" + "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/internal/app/machined/pkg/system/events" "github.com/talos-systems/talos/internal/app/machined/pkg/system/health" @@ -29,7 +31,6 @@ import ( "github.com/talos-systems/talos/pkg/kubernetes" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) // APID implements the Service interface. It serves as the concrete type with diff --git a/internal/app/machined/pkg/system/services/bootkube.go b/internal/app/machined/pkg/system/services/bootkube.go index 103d9647d..188b7502a 100644 --- a/internal/app/machined/pkg/system/services/bootkube.go +++ b/internal/app/machined/pkg/system/services/bootkube.go @@ -19,6 +19,8 @@ import ( "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/talos-systems/go-retry/retry" + "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/internal/app/machined/pkg/system/events" "github.com/talos-systems/talos/internal/app/machined/pkg/system/runner" @@ -28,7 +30,6 @@ import ( machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) // Bootkube implements the Service interface. It serves as the concrete type with diff --git a/internal/app/machined/pkg/system/services/etcd.go b/internal/app/machined/pkg/system/services/etcd.go index cbd81bc08..387514814 100644 --- a/internal/app/machined/pkg/system/services/etcd.go +++ b/internal/app/machined/pkg/system/services/etcd.go @@ -26,7 +26,7 @@ import ( "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/talos-systems/crypto/x509" - + "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/net" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" @@ -43,7 +43,6 @@ import ( "github.com/talos-systems/talos/pkg/conditions" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) // Etcd implements the Service interface. It serves as the concrete type with diff --git a/internal/app/networkd/pkg/nic/nic.go b/internal/app/networkd/pkg/nic/nic.go index 21a500ac1..ea941b431 100644 --- a/internal/app/networkd/pkg/nic/nic.go +++ b/internal/app/networkd/pkg/nic/nic.go @@ -24,10 +24,10 @@ import ( "google.golang.org/protobuf/proto" "github.com/talos-systems/go-procfs/procfs" + "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/talos/internal/app/networkd/pkg/address" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) const ( diff --git a/internal/app/timed/pkg/ntp/ntp.go b/internal/app/timed/pkg/ntp/ntp.go index 2f691b18b..5482d2802 100644 --- a/internal/app/timed/pkg/ntp/ntp.go +++ b/internal/app/timed/pkg/ntp/ntp.go @@ -15,9 +15,9 @@ import ( "github.com/beevik/ntp" "github.com/hashicorp/go-multierror" + "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/talos/internal/app/timed/pkg/timex" - "github.com/talos-systems/talos/pkg/retry" ) // NTP contains a server address. diff --git a/internal/integration/api/reboot.go b/internal/integration/api/reboot.go index dac11d203..df881d69f 100644 --- a/internal/integration/api/reboot.go +++ b/internal/integration/api/reboot.go @@ -13,9 +13,10 @@ import ( "testing" "time" + "github.com/talos-systems/go-retry/retry" + "github.com/talos-systems/talos/internal/integration/base" "github.com/talos-systems/talos/pkg/machinery/client" - "github.com/talos-systems/talos/pkg/retry" ) type RebootSuite struct { diff --git a/internal/integration/api/version.go b/internal/integration/api/version.go index 6d3dd0cf2..136eaea92 100644 --- a/internal/integration/api/version.go +++ b/internal/integration/api/version.go @@ -10,10 +10,11 @@ import ( "context" "time" + "github.com/talos-systems/go-retry/retry" + "github.com/talos-systems/talos/internal/integration/base" "github.com/talos-systems/talos/pkg/machinery/api/machine" "github.com/talos-systems/talos/pkg/machinery/client" - "github.com/talos-systems/talos/pkg/retry" ) // VersionSuite verifies version API. diff --git a/internal/integration/base/api.go b/internal/integration/base/api.go index 36daae9a3..6ef76bcaf 100644 --- a/internal/integration/base/api.go +++ b/internal/integration/base/api.go @@ -15,6 +15,7 @@ import ( "time" "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/pkg/cluster" @@ -24,7 +25,6 @@ import ( "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/provision" "github.com/talos-systems/talos/pkg/provision/access" - "github.com/talos-systems/talos/pkg/retry" ) // APISuite is a base suite for API tests. diff --git a/internal/integration/base/cli.go b/internal/integration/base/cli.go index ddbde82ad..3ce202c5c 100644 --- a/internal/integration/base/cli.go +++ b/internal/integration/base/cli.go @@ -18,12 +18,12 @@ import ( "time" "github.com/stretchr/testify/suite" + "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/talos/pkg/cluster" "github.com/talos-systems/talos/pkg/cmd" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) // CLISuite is a base suite for CLI tests. diff --git a/internal/integration/provision/upgrade.go b/internal/integration/provision/upgrade.go index 8cdda04a2..5aa6d2d15 100644 --- a/internal/integration/provision/upgrade.go +++ b/internal/integration/provision/upgrade.go @@ -19,7 +19,7 @@ import ( "time" "github.com/stretchr/testify/suite" - + "github.com/talos-systems/go-retry/retry" talosnet "github.com/talos-systems/net" "github.com/talos-systems/talos/cmd/talosctl/pkg/mgmt/helpers" @@ -36,7 +36,6 @@ import ( "github.com/talos-systems/talos/pkg/provision" "github.com/talos-systems/talos/pkg/provision/access" "github.com/talos-systems/talos/pkg/provision/providers/qemu" - "github.com/talos-systems/talos/pkg/retry" ) type upgradeSpec struct { diff --git a/internal/pkg/containers/image/image.go b/internal/pkg/containers/image/image.go index ed405bdca..cb16befc7 100644 --- a/internal/pkg/containers/image/image.go +++ b/internal/pkg/containers/image/image.go @@ -9,10 +9,10 @@ import ( "fmt" "time" - "github.com/talos-systems/talos/pkg/machinery/config" - "github.com/talos-systems/talos/pkg/retry" - "github.com/containerd/containerd" + "github.com/talos-systems/go-retry/retry" + + "github.com/talos-systems/talos/pkg/machinery/config" ) // Pull is a convenience function that wraps the containerd image pull func with diff --git a/internal/pkg/loadbalancer/loadbalancer.go b/internal/pkg/loadbalancer/loadbalancer.go deleted file mode 100644 index 625f6f46f..000000000 --- a/internal/pkg/loadbalancer/loadbalancer.go +++ /dev/null @@ -1,92 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -// Package loadbalancer provides simple TCP loadbalancer. -package loadbalancer - -import ( - "context" - "log" - "net" - - "inet.af/tcpproxy" - - "github.com/talos-systems/talos/internal/pkg/loadbalancer/upstream" -) - -// TCP is a simple loadbalancer for TCP connections across a set of upstreams. -// -// Healthcheck is defined as TCP dial attempt by default. -// -// Zero value of TCP is a valid proxy, use `AddRoute` to install load balancer for -// address. -// -// Usage: call Run() to start lb and wait for shutdown, call Close() to shutdown lb. -type TCP struct { - tcpproxy.Proxy -} - -type lbUpstream string - -func (upstream lbUpstream) HealthCheck(ctx context.Context) error { - d := net.Dialer{} - - c, err := d.DialContext(ctx, "tcp", string(upstream)) - if err != nil { - log.Printf("healthcheck failed for %q: %s", string(upstream), err) - - return err - } - - return c.Close() -} - -type lbTarget struct { - list *upstream.List -} - -func (target *lbTarget) HandleConn(conn net.Conn) { - upstreamBackend, err := target.list.Pick() - if err != nil { - log.Printf("no upstreams available, closing connection from %s", conn.RemoteAddr()) - conn.Close() //nolint: errcheck - - return - } - - upstreamAddr := upstreamBackend.(lbUpstream) //nolint: errcheck - - log.Printf("proxying connection %s -> %s", conn.RemoteAddr(), string(upstreamAddr)) - - upstreamTarget := tcpproxy.To(string(upstreamAddr)) - upstreamTarget.OnDialError = func(src net.Conn, dstDialErr error) { - src.Close() //nolint: errcheck - - log.Printf("error dialing upstream %s: %s", string(upstreamAddr), dstDialErr) - - target.list.Down(upstreamBackend) - } - - upstreamTarget.HandleConn(conn) -} - -// AddRoute installs load balancer route from listen address ipAddr to list of upstreams. -// -// TCP automatically does background health checks for the upstreams and picks only healthy -// ones. Healthcheck is simple Dial attempt. -func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstream.ListOption) error { - upstreams := make([]upstream.Backend, len(upstreamAddrs)) - for i := range upstreams { - upstreams[i] = lbUpstream(upstreamAddrs[i]) - } - - list, err := upstream.NewList(upstreams, options...) - if err != nil { - return err - } - - t.Proxy.AddRoute(ipPort, &lbTarget{list: list}) - - return nil -} diff --git a/internal/pkg/loadbalancer/loadbalancer_test.go b/internal/pkg/loadbalancer/loadbalancer_test.go deleted file mode 100644 index adcaa8773..000000000 --- a/internal/pkg/loadbalancer/loadbalancer_test.go +++ /dev/null @@ -1,177 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package loadbalancer_test - -import ( - "io/ioutil" - "net" - "strconv" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/suite" - - "github.com/talos-systems/talos/internal/pkg/loadbalancer" - "github.com/talos-systems/talos/internal/pkg/loadbalancer/upstream" -) - -type mockUpstream struct { - identity string - - addr string - l net.Listener -} - -func (u *mockUpstream) Start() error { - var err error - - u.l, err = net.Listen("tcp", "localhost:0") - if err != nil { - return err - } - - u.addr = u.l.Addr().String() - - go u.serve() - - return nil -} - -func (u *mockUpstream) serve() { - for { - c, err := u.l.Accept() - if err != nil { - return - } - - c.Write([]byte(u.identity)) //nolint: errcheck - c.Close() //nolint: errcheck - } -} - -func (u *mockUpstream) Close() { - u.l.Close() //nolint: errcheck -} - -func findListenAddress() (string, error) { - u := mockUpstream{} - - if err := u.Start(); err != nil { - return "", err - } - - u.Close() - - return u.addr, nil -} - -type TCPSuite struct { - suite.Suite -} - -func (suite *TCPSuite) TestBalancer() { - const ( - upstreamCount = 5 - failingUpstream = 1 - ) - - upstreams := make([]mockUpstream, upstreamCount) - for i := range upstreams { - upstreams[i].identity = strconv.Itoa(i) - suite.Require().NoError(upstreams[i].Start()) - } - - upstreamAddrs := make([]string, len(upstreams)) - for i := range upstreamAddrs { - upstreamAddrs[i] = upstreams[i].addr - } - - listenAddr, err := findListenAddress() - suite.Require().NoError(err) - - lb := &loadbalancer.TCP{} - suite.Require().NoError(lb.AddRoute( - listenAddr, - upstreamAddrs, - upstream.WithLowHighScores(-3, 3), - upstream.WithInitialScore(1), - upstream.WithScoreDeltas(-1, 1), - upstream.WithHealthcheckInterval(time.Second), - upstream.WithHealthcheckTimeout(100*time.Millisecond), - )) - - suite.Require().NoError(lb.Start()) - - var wg sync.WaitGroup - - wg.Add(1) - - go func() { - defer wg.Done() - - lb.Wait() //nolint: errcheck - }() - - for i := 0; i < 2*upstreamCount; i++ { - c, err := net.Dial("tcp", listenAddr) - suite.Require().NoError(err) - - id, err := ioutil.ReadAll(c) - suite.Require().NoError(err) - - // load balancer should go round-robin across all the upstreams - suite.Assert().Equal([]byte(strconv.Itoa(i%upstreamCount)), id) - - suite.Require().NoError(c.Close()) - } - - // bring down one upstream - upstreams[failingUpstream].Close() - - j := 0 - failedRequests := 0 - - for i := 0; i < 10*upstreamCount; i++ { - c, err := net.Dial("tcp", listenAddr) - suite.Require().NoError(err) - - id, err := ioutil.ReadAll(c) - suite.Require().NoError(err) - - if len(id) == 0 { - // hit failing upstream - suite.Assert().Equal(failingUpstream, j%upstreamCount) - - failedRequests++ - - continue - } - - if j%upstreamCount == failingUpstream { - j++ - } - - // load balancer should go round-robin across all the upstreams - suite.Assert().Equal([]byte(strconv.Itoa(j%upstreamCount)), id) - j++ - - suite.Require().NoError(c.Close()) - } - - // worst case: score = 3 (highScore) to go to -1 requires 5 requests - suite.Assert().Less(failedRequests, 5) // no more than 5 requests should fail - - suite.Require().NoError(lb.Close()) - wg.Wait() - - for i := range upstreams { - upstreams[i].Close() - } -} - -func TestTCPSuite(t *testing.T) { - suite.Run(t, new(TCPSuite)) -} diff --git a/internal/pkg/loadbalancer/upstream/upstream.go b/internal/pkg/loadbalancer/upstream/upstream.go deleted file mode 100644 index 07e2e6787..000000000 --- a/internal/pkg/loadbalancer/upstream/upstream.go +++ /dev/null @@ -1,256 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -// Package upstream provides utilities for choosing upstream backends based on score. -package upstream - -import ( - "context" - "fmt" - "sync" - "time" -) - -// Backend is an interface which should be implemented for a Pick entry. -type Backend interface { - HealthCheck(ctx context.Context) error -} - -type node struct { - backend Backend - score float64 -} - -// ListOption allows to configure List. -type ListOption func(*List) error - -// WithLowHighScores configures low and high score. -func WithLowHighScores(lowScore, highScore float64) ListOption { - return func(l *List) error { - if l.lowScore > 0 { - return fmt.Errorf("lowScore should be non-positive") - } - - if l.highScore < 0 { - return fmt.Errorf("highScore should be non-positive") - } - - if l.lowScore > l.highScore { - return fmt.Errorf("lowScore should be less or equal to highScore") - } - - l.lowScore, l.highScore = lowScore, highScore - - return nil - } -} - -// WithScoreDeltas configures fail and success score delta. -func WithScoreDeltas(failScoreDelta, successScoreDelta float64) ListOption { - return func(l *List) error { - if l.failScoreDelta >= 0 { - return fmt.Errorf("failScoreDelta should be negative") - } - - if l.successScoreDelta <= 0 { - return fmt.Errorf("successScoreDelta should be positive") - } - - l.failScoreDelta, l.successScoreDelta = failScoreDelta, successScoreDelta - - return nil - } -} - -// WithInitialScore configures initial backend score. -func WithInitialScore(initialScore float64) ListOption { - return func(l *List) error { - l.initialScore = initialScore - - return nil - } -} - -// WithHealthcheckInterval configures healthcheck interval. -func WithHealthcheckInterval(interval time.Duration) ListOption { - return func(l *List) error { - l.healthcheckInterval = interval - - return nil - } -} - -// WithHealthcheckTimeout configures healthcheck timeout (for each backend). -func WithHealthcheckTimeout(timeout time.Duration) ListOption { - return func(l *List) error { - l.healthcheckTimeout = timeout - - return nil - } -} - -// List of upstream Backends with healthchecks and different strategies to pick a node. -// -// List keeps track of Backends with score. Score is updated on health checks, and via external -// interface (e.g. when actual connection fails). -// -// Initial score is set via options (default is +1). Low and high scores defaults are (-3, +3). -// Backend score is limited by low and high scores. Each time healthcheck fails score is adjusted -// by fail delta score, and every successful check updates score by success score delta (defaults are -1/+1). -// -// Backend might be used if its score is not negative. -type List struct { - lowScore, highScore float64 - failScoreDelta, successScoreDelta float64 - initialScore float64 - - healthcheckInterval time.Duration - healthcheckTimeout time.Duration - - healthWg sync.WaitGroup - healthCtx context.Context - healthCtxCancel context.CancelFunc - - // Following fields are protected by mutex - mu sync.Mutex - - nodes []node - current int -} - -// NewList initializes new list with upstream backends and options and starts health checks. -// -// List should be stopped with `.Shutdown()`. -func NewList(upstreams []Backend, options ...ListOption) (*List, error) { - // initialize with defaults - list := &List{ - lowScore: -3.0, - highScore: 3.0, - failScoreDelta: -1.0, - successScoreDelta: 1.0, - initialScore: 1.0, - - healthcheckInterval: 1 * time.Second, - healthcheckTimeout: 100 * time.Millisecond, - - current: -1, - } - - list.healthCtx, list.healthCtxCancel = context.WithCancel(context.Background()) - - for _, opt := range options { - if err := opt(list); err != nil { - return nil, err - } - } - - list.nodes = make([]node, len(upstreams)) - - for i := range list.nodes { - list.nodes[i].backend = upstreams[i] - list.nodes[i].score = list.initialScore - } - - list.healthWg.Add(1) - - go list.healthcheck() - - return list, nil -} - -// Shutdown stops healthchecks. -func (list *List) Shutdown() { - list.healthCtxCancel() - - list.healthWg.Wait() -} - -// Up increases backend score by success score delta. -func (list *List) Up(upstream Backend) { - list.mu.Lock() - defer list.mu.Unlock() - - for i := range list.nodes { - if list.nodes[i].backend == upstream { - list.nodes[i].score += list.successScoreDelta - if list.nodes[i].score > list.highScore { - list.nodes[i].score = list.highScore - } - } - } -} - -// Down decreases backend score by fail score delta. -func (list *List) Down(upstream Backend) { - list.mu.Lock() - defer list.mu.Unlock() - - for i := range list.nodes { - if list.nodes[i].backend == upstream { - list.nodes[i].score += list.failScoreDelta - if list.nodes[i].score < list.lowScore { - list.nodes[i].score = list.lowScore - } - } - } -} - -// Pick returns next backend to be used. -// -// Default policy is to pick healthy (non-negative score) backend in -// round-robin fashion. -func (list *List) Pick() (Backend, error) { - list.mu.Lock() - defer list.mu.Unlock() - - for j := 0; j < len(list.nodes); j++ { - i := (list.current + 1 + j) % len(list.nodes) - - if list.nodes[i].score >= 0 { - list.current = i - - return list.nodes[list.current].backend, nil - } - } - - return nil, fmt.Errorf("no upstreams available") -} - -func (list *List) healthcheck() { - defer list.healthWg.Done() - - ticker := time.NewTicker(list.healthcheckInterval) - defer ticker.Stop() - - for { - list.mu.Lock() - nodes := append([]node(nil), list.nodes...) - list.mu.Unlock() - - for _, node := range nodes { - select { - case <-list.healthCtx.Done(): - return - default: - } - - func() { - ctx, ctxCancel := context.WithTimeout(list.healthCtx, list.healthcheckTimeout) - defer ctxCancel() - - if err := node.backend.HealthCheck(ctx); err != nil { - list.Down(node.backend) - } else { - list.Up(node.backend) - } - }() - } - - select { - case <-ticker.C: - case <-list.healthCtx.Done(): - return - } - } -} diff --git a/internal/pkg/loadbalancer/upstream/upstream_test.go b/internal/pkg/loadbalancer/upstream/upstream_test.go deleted file mode 100644 index 8cb5194c1..000000000 --- a/internal/pkg/loadbalancer/upstream/upstream_test.go +++ /dev/null @@ -1,192 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package upstream_test - -import ( - "context" - "errors" - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/suite" - - "github.com/talos-systems/talos/internal/pkg/loadbalancer/upstream" - "github.com/talos-systems/talos/pkg/retry" -) - -type mockBackend string - -func (b mockBackend) HealthCheck(ctx context.Context) error { - switch string(b) { - case "fail": - return errors.New("fail") - case "success": - return nil - default: - <-ctx.Done() - - return ctx.Err() - } -} - -type ListSuite struct { - suite.Suite -} - -func (suite *ListSuite) TestEmpty() { - l, err := upstream.NewList(nil) - suite.Require().NoError(err) - - defer l.Shutdown() - - backend, err := l.Pick() - suite.Assert().Nil(backend) - suite.Assert().EqualError(err, "no upstreams available") -} - -func (suite *ListSuite) TestRoundRobin() { - l, err := upstream.NewList([]upstream.Backend{mockBackend("one"), mockBackend("two"), mockBackend("three")}) - suite.Require().NoError(err) - - defer l.Shutdown() - - backend, err := l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("two"), backend) - suite.Assert().NoError(err) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("three"), backend) - suite.Assert().NoError(err) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) -} - -func (suite *ListSuite) TestDownUp() { - l, err := upstream.NewList( - []upstream.Backend{ - mockBackend("one"), - mockBackend("two"), - mockBackend("three"), - }, - upstream.WithLowHighScores(-3, 3), - upstream.WithInitialScore(1), - upstream.WithScoreDeltas(-1, 1), - upstream.WithHealthcheckInterval(time.Hour), - ) - suite.Require().NoError(err) - - defer l.Shutdown() - - backend, err := l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) - - l.Down(mockBackend("two")) // score == 0 - l.Down(mockBackend("two")) // score == -1 - l.Down(mockBackend("three")) // score == 0 - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("three"), backend) - suite.Assert().NoError(err) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("three"), backend) - suite.Assert().NoError(err) - - l.Down(mockBackend("three")) // score == -1 - l.Up(mockBackend("two")) // score == 0 - l.Up(mockBackend("two")) // score == 1 - l.Up(mockBackend("two")) // score == 2 - l.Up(mockBackend("two")) // score == 3 - l.Up(mockBackend("two")) // score == 3 (capped at highScore) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("two"), backend) - suite.Assert().NoError(err) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) - - l.Down(mockBackend("two")) // score == 2 - l.Down(mockBackend("two")) // score == 1 - l.Down(mockBackend("two")) // score == 0 - l.Down(mockBackend("two")) // score == -1 - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) - - l.Down(mockBackend("two")) // score == -2 - l.Down(mockBackend("two")) // score == -3 - l.Down(mockBackend("two")) // score == -3 (capped at lowScore) - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("one"), backend) - suite.Assert().NoError(err) - - l.Up(mockBackend("two")) // score == -2 - l.Up(mockBackend("two")) // score == -1 - l.Up(mockBackend("two")) // score == 0 - - backend, err = l.Pick() - suite.Assert().Equal(mockBackend("two"), backend) - suite.Assert().NoError(err) -} - -func (suite *ListSuite) TestHealthcheck() { - l, err := upstream.NewList( - []upstream.Backend{ - mockBackend("success"), - mockBackend("fail"), - mockBackend("timeout"), - }, - upstream.WithLowHighScores(-1, 1), - upstream.WithInitialScore(1), - upstream.WithScoreDeltas(-1, 1), - upstream.WithHealthcheckInterval(10*time.Millisecond), - upstream.WithHealthcheckTimeout(time.Millisecond), - ) - suite.Require().NoError(err) - - defer l.Shutdown() - - time.Sleep(20 * time.Millisecond) // let healthchecks run - - // when health info converges, "success" should be the only backend left - suite.Require().NoError(retry.Constant(time.Second, retry.WithUnits(time.Millisecond)).Retry(func() error { - for i := 0; i < 10; i++ { - backend, err := l.Pick() - if err != nil { - return retry.UnexpectedError(err) - } - - if backend.(mockBackend) != mockBackend("success") { - return retry.ExpectedError(fmt.Errorf("unexpected %v", backend)) - } - } - - return nil - })) -} - -func TestListSuite(t *testing.T) { - suite.Run(t, new(ListSuite)) -} diff --git a/internal/pkg/mount/mount.go b/internal/pkg/mount/mount.go index e2fee5ff2..cdca8f410 100644 --- a/internal/pkg/mount/mount.go +++ b/internal/pkg/mount/mount.go @@ -13,6 +13,7 @@ import ( "strings" "time" + "github.com/talos-systems/go-retry/retry" "golang.org/x/sys/unix" "github.com/talos-systems/talos/pkg/blockdevice" @@ -20,7 +21,6 @@ import ( gptpartition "github.com/talos-systems/talos/pkg/blockdevice/table/gpt/partition" "github.com/talos-systems/talos/pkg/blockdevice/util" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) // RetryFunc defines the requirements for retrying a mount point operation. diff --git a/pkg/blockdevice/blkpg/blkpg_linux.go b/pkg/blockdevice/blkpg/blkpg_linux.go index 395319ff3..96f67f1cc 100644 --- a/pkg/blockdevice/blkpg/blkpg_linux.go +++ b/pkg/blockdevice/blkpg/blkpg_linux.go @@ -11,11 +11,11 @@ import ( "time" "unsafe" + "github.com/talos-systems/go-retry/retry" "golang.org/x/sys/unix" "github.com/talos-systems/talos/pkg/blockdevice/lba" "github.com/talos-systems/talos/pkg/blockdevice/table" - "github.com/talos-systems/talos/pkg/retry" ) // InformKernelOfAdd invokes the BLKPG_ADD_PARTITION ioctl. diff --git a/pkg/blockdevice/blockdevice_linux.go b/pkg/blockdevice/blockdevice_linux.go index e73469909..7f7963bd1 100644 --- a/pkg/blockdevice/blockdevice_linux.go +++ b/pkg/blockdevice/blockdevice_linux.go @@ -12,11 +12,11 @@ import ( "time" "unsafe" + "github.com/talos-systems/go-retry/retry" + "golang.org/x/sys/unix" + "github.com/talos-systems/talos/pkg/blockdevice/table" "github.com/talos-systems/talos/pkg/blockdevice/table/gpt" - "github.com/talos-systems/talos/pkg/retry" - - "golang.org/x/sys/unix" ) // BlockDevice represents a block device. diff --git a/pkg/blockdevice/probe/probe.go b/pkg/blockdevice/probe/probe.go index 1d07c1e3a..925663d1d 100644 --- a/pkg/blockdevice/probe/probe.go +++ b/pkg/blockdevice/probe/probe.go @@ -15,6 +15,9 @@ import ( "path/filepath" "time" + "github.com/talos-systems/go-retry/retry" + "golang.org/x/sys/unix" + "github.com/talos-systems/talos/pkg/blockdevice" "github.com/talos-systems/talos/pkg/blockdevice/filesystem" "github.com/talos-systems/talos/pkg/blockdevice/filesystem/iso9660" @@ -22,9 +25,6 @@ import ( "github.com/talos-systems/talos/pkg/blockdevice/filesystem/xfs" gptpartition "github.com/talos-systems/talos/pkg/blockdevice/table/gpt/partition" "github.com/talos-systems/talos/pkg/blockdevice/util" - "github.com/talos-systems/talos/pkg/retry" - - "golang.org/x/sys/unix" ) // ProbedBlockDevice represents a probed block device. diff --git a/pkg/cluster/bootstrap.go b/pkg/cluster/bootstrap.go index 952139223..8cc446f0a 100644 --- a/pkg/cluster/bootstrap.go +++ b/pkg/cluster/bootstrap.go @@ -11,9 +11,10 @@ import ( "sort" "time" + "github.com/talos-systems/go-retry/retry" + "github.com/talos-systems/talos/pkg/machinery/client" "github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine" - "github.com/talos-systems/talos/pkg/retry" ) // APIBoostrapper bootstraps cluster via Talos API. diff --git a/pkg/download/download.go b/pkg/download/download.go index ae67e3b07..0ef35b20d 100644 --- a/pkg/download/download.go +++ b/pkg/download/download.go @@ -12,7 +12,7 @@ import ( "net/url" "time" - "github.com/talos-systems/talos/pkg/retry" + "github.com/talos-systems/go-retry/retry" ) const b64 = "base64" diff --git a/pkg/kubernetes/kubernetes.go b/pkg/kubernetes/kubernetes.go index c25383840..9aafd7d47 100644 --- a/pkg/kubernetes/kubernetes.go +++ b/pkg/kubernetes/kubernetes.go @@ -29,9 +29,9 @@ import ( "k8s.io/client-go/tools/clientcmd" "github.com/talos-systems/crypto/x509" + "github.com/talos-systems/go-retry/retry" "github.com/talos-systems/talos/pkg/machinery/constants" - "github.com/talos-systems/talos/pkg/retry" ) // Client represents a set of helper methods for interacting with the diff --git a/pkg/retry/constant.go b/pkg/retry/constant.go deleted file mode 100644 index a34dfd1d6..000000000 --- a/pkg/retry/constant.go +++ /dev/null @@ -1,57 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package retry - -import ( - "time" -) - -type constantRetryer struct { - retryer -} - -// ConstantTicker represents a ticker with a constant algorithm. -type ConstantTicker struct { - ticker -} - -// Constant initializes and returns a constant Retryer. -func Constant(duration time.Duration, setters ...Option) Retryer { - opts := NewDefaultOptions(setters...) - - return constantRetryer{ - retryer: retryer{ - duration: duration, - options: opts, - }, - } -} - -// NewConstantTicker is a ticker that sends the time on a channel using a -// constant algorithm. -func NewConstantTicker(opts *Options) *ConstantTicker { - l := &ConstantTicker{ - ticker: ticker{ - C: make(chan time.Time, 1), - options: opts, - s: make(chan struct{}, 1), - }, - } - - return l -} - -// Retry implements the Retryer interface. -func (c constantRetryer) Retry(f RetryableFunc) error { - tick := NewConstantTicker(c.options) - defer tick.Stop() - - return retry(f, c.duration, tick) -} - -// Tick implements the Ticker interface. -func (c ConstantTicker) Tick() time.Duration { - return c.options.Units + c.Jitter() -} diff --git a/pkg/retry/constant_test.go b/pkg/retry/constant_test.go deleted file mode 100644 index eb9095634..000000000 --- a/pkg/retry/constant_test.go +++ /dev/null @@ -1,172 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -//nolint: testpackage -package retry - -import ( - "fmt" - "testing" - "time" -) - -// nolint: scopelint -func Test_constantRetryer_Retry(t *testing.T) { - type fields struct { - retryer retryer - } - - type args struct { - f RetryableFunc - } - - count := 0 - - tests := []struct { - name string - fields fields - args args - expectedCount int - wantErr bool - }{ - { - name: "test expected number of retries", - fields: fields{ - retryer: retryer{ - duration: 2500 * time.Millisecond, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 3, - wantErr: true, - }, - { - name: "test expected number of retries with units", - fields: fields{ - retryer: retryer{ - duration: 2250 * time.Millisecond, - options: NewDefaultOptions(WithUnits(500 * time.Millisecond)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 5, - wantErr: true, - }, - { - name: "test unexpected error", - fields: fields{ - retryer: retryer{ - duration: 2 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - return UnexpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 1, - wantErr: true, - }, - { - name: "test conditional unexpected error", - fields: fields{ - retryer: retryer{ - duration: 10 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - if count == 2 { - return UnexpectedError(fmt.Errorf("unexpected")) - } - return ExpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 2, - wantErr: true, - }, - { - name: "test conditional no error", - fields: fields{ - retryer: retryer{ - duration: 10 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - if count == 2 { - return nil - } - return ExpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 2, - wantErr: false, - }, - { - name: "no error", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - return nil - }, - }, - expectedCount: 0, - wantErr: false, - }, - { - name: "test timeout", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(WithUnits(10 * time.Second)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 1, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := constantRetryer{ - retryer: tt.fields.retryer, - } - count = 0 - if err := e.Retry(tt.args.f); (err != nil) != tt.wantErr { - t.Errorf("constantRetryer.Retry() error = %v, wantErr %v", err, tt.wantErr) - } - if count != tt.expectedCount { - t.Errorf("expected count of %d, got %d", tt.expectedCount, count) - } - }) - } -} diff --git a/pkg/retry/exponential.go b/pkg/retry/exponential.go deleted file mode 100644 index caaa670be..000000000 --- a/pkg/retry/exponential.go +++ /dev/null @@ -1,66 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package retry - -import ( - "math" - "time" -) - -type exponentialRetryer struct { - retryer -} - -// ExponentialTicker represents a ticker with a truncated exponential algorithm. -// Please see https://en.wikipedia.org/wiki/Exponential_backoff for details on -// the algorithm. -type ExponentialTicker struct { - ticker - - c float64 -} - -// Exponential initializes and returns a truncated exponential Retryer. -func Exponential(duration time.Duration, setters ...Option) Retryer { - opts := NewDefaultOptions(setters...) - - return exponentialRetryer{ - retryer: retryer{ - duration: duration, - options: opts, - }, - } -} - -// NewExponentialTicker is a ticker that sends the time on a channel using a -// truncated exponential algorithm. -func NewExponentialTicker(opts *Options) *ExponentialTicker { - e := &ExponentialTicker{ - ticker: ticker{ - C: make(chan time.Time, 1), - options: opts, - s: make(chan struct{}, 1), - }, - c: 1.0, - } - - return e -} - -// Retry implements the Retryer interface. -func (e exponentialRetryer) Retry(f RetryableFunc) error { - tick := NewExponentialTicker(e.options) - defer tick.Stop() - - return retry(f, e.duration, tick) -} - -// Tick implements the Ticker interface. -func (e *ExponentialTicker) Tick() time.Duration { - d := time.Duration((math.Pow(2, e.c)-1)/2)*e.options.Units + e.Jitter() - e.c++ - - return d -} diff --git a/pkg/retry/exponential_test.go b/pkg/retry/exponential_test.go deleted file mode 100644 index 7eabd4fea..000000000 --- a/pkg/retry/exponential_test.go +++ /dev/null @@ -1,172 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -//nolint: testpackage -package retry - -import ( - "fmt" - "testing" - "time" -) - -// nolint: scopelint -func Test_exponentialRetryer_Retry(t *testing.T) { - type fields struct { - retryer retryer - } - - type args struct { - f RetryableFunc - } - - count := 0 - - tests := []struct { - name string - fields fields - args args - expectedCount int - wantErr bool - }{ - { - name: "test expected number of retries", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(WithUnits(100 * time.Millisecond)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 4, - wantErr: true, - }, - { - name: "test expected number of retries with units", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(WithUnits(50 * time.Millisecond)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 5, - wantErr: true, - }, - { - name: "test unexpected error", - fields: fields{ - retryer: retryer{ - duration: 2 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - return UnexpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 1, - wantErr: true, - }, - { - name: "test conditional unexpected error", - fields: fields{ - retryer: retryer{ - duration: 10 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - if count == 2 { - return UnexpectedError(fmt.Errorf("unexpected")) - } - return ExpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 2, - wantErr: true, - }, - { - name: "test conditional no error", - fields: fields{ - retryer: retryer{ - duration: 10 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - if count == 2 { - return nil - } - return ExpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 2, - wantErr: false, - }, - { - name: "no error", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - return nil - }, - }, - expectedCount: 0, - wantErr: false, - }, - { - name: "test timeout", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(WithUnits(10 * time.Second)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 2, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := exponentialRetryer{ - retryer: tt.fields.retryer, - } - count = 0 - if err := e.Retry(tt.args.f); (err != nil) != tt.wantErr { - t.Errorf("exponentialRetryer.Retry() error = %v, wantErr %v", err, tt.wantErr) - } - if count != tt.expectedCount { - t.Errorf("expected count of %d, got %d", tt.expectedCount, count) - } - }) - } -} diff --git a/pkg/retry/linear.go b/pkg/retry/linear.go deleted file mode 100644 index e34c76506..000000000 --- a/pkg/retry/linear.go +++ /dev/null @@ -1,63 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package retry - -import ( - "time" -) - -type linearRetryer struct { - retryer -} - -// LinearTicker represents a ticker with a linear algorithm. -type LinearTicker struct { - ticker - - c int -} - -// Linear initializes and returns a linear Retryer. -func Linear(duration time.Duration, setters ...Option) Retryer { - opts := NewDefaultOptions(setters...) - - return linearRetryer{ - retryer: retryer{ - duration: duration, - options: opts, - }, - } -} - -// NewLinearTicker is a ticker that sends the time on a channel using a -// linear algorithm. -func NewLinearTicker(opts *Options) *LinearTicker { - l := &LinearTicker{ - ticker: ticker{ - C: make(chan time.Time, 1), - options: opts, - s: make(chan struct{}, 1), - }, - c: 1, - } - - return l -} - -// Retry implements the Retryer interface. -func (l linearRetryer) Retry(f RetryableFunc) error { - tick := NewLinearTicker(l.options) - defer tick.Stop() - - return retry(f, l.duration, tick) -} - -// Tick implements the Ticker interface. -func (l *LinearTicker) Tick() time.Duration { - d := time.Duration(l.c)*l.options.Units + l.Jitter() - l.c++ - - return d -} diff --git a/pkg/retry/linear_test.go b/pkg/retry/linear_test.go deleted file mode 100644 index 34bea1a6d..000000000 --- a/pkg/retry/linear_test.go +++ /dev/null @@ -1,172 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -//nolint: testpackage -package retry - -import ( - "fmt" - "testing" - "time" -) - -// nolint: scopelint -func Test_linearRetryer_Retry(t *testing.T) { - type fields struct { - retryer retryer - } - - type args struct { - f RetryableFunc - } - - count := 0 - - tests := []struct { - name string - fields fields - args args - expectedCount int - wantErr bool - }{ - { - name: "test expected number of retries", - fields: fields{ - retryer: retryer{ - duration: 1*time.Second + 200*time.Millisecond, - options: NewDefaultOptions(WithUnits(100 * time.Millisecond)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 5, - wantErr: true, - }, - { - name: "test expected number of retries with units", - fields: fields{ - retryer: retryer{ - duration: 2 * time.Second, - options: NewDefaultOptions(WithUnits(50 * time.Millisecond)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 9, - wantErr: true, - }, - { - name: "test unexpected error", - fields: fields{ - retryer: retryer{ - duration: 2 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - return UnexpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 1, - wantErr: true, - }, - { - name: "test conditional unexpected error", - fields: fields{ - retryer: retryer{ - duration: 10 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - if count == 1 { - return UnexpectedError(fmt.Errorf("unexpected")) - } - return ExpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 1, - wantErr: true, - }, - { - name: "test conditional no error", - fields: fields{ - retryer: retryer{ - duration: 10 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - count++ - if count == 2 { - return nil - } - return ExpectedError(fmt.Errorf("unexpected")) - }, - }, - expectedCount: 2, - wantErr: false, - }, - { - name: "no error", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(), - }, - }, - args: args{ - f: func() error { - return nil - }, - }, - expectedCount: 0, - wantErr: false, - }, - { - name: "test timeout", - fields: fields{ - retryer: retryer{ - duration: 1 * time.Second, - options: NewDefaultOptions(WithUnits(10 * time.Second)), - }, - }, - args: args{ - f: func() error { - count++ - return ExpectedError(fmt.Errorf("expected")) - }, - }, - expectedCount: 1, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - l := linearRetryer{ - retryer: tt.fields.retryer, - } - count = 0 - if err := l.Retry(tt.args.f); (err != nil) != tt.wantErr { - t.Errorf("linearRetryer.Retry() error = %v, wantErr %v", err, tt.wantErr) - } - if count != tt.expectedCount { - t.Errorf("expected count of %d, got %d", tt.expectedCount, count) - } - }) - } -} diff --git a/pkg/retry/options.go b/pkg/retry/options.go deleted file mode 100644 index 1277837c8..000000000 --- a/pkg/retry/options.go +++ /dev/null @@ -1,44 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package retry - -import "time" - -// Options is the functional options struct. -type Options struct { - Units time.Duration - Jitter time.Duration -} - -// Option is the functional option func. -type Option func(*Options) - -// WithUnits is a functional option for setting the units of the ticker. -func WithUnits(o time.Duration) Option { - return func(args *Options) { - args.Units = o - } -} - -// WithJitter is a functional option for setting the jitter flag. -func WithJitter(o time.Duration) Option { - return func(args *Options) { - args.Jitter = o - } -} - -// NewDefaultOptions initializes a Options struct with default values. -func NewDefaultOptions(setters ...Option) *Options { - opts := &Options{ - Units: time.Second, - Jitter: time.Duration(0), - } - - for _, setter := range setters { - setter(opts) - } - - return opts -} diff --git a/pkg/retry/options_test.go b/pkg/retry/options_test.go deleted file mode 100644 index 685b9d72d..000000000 --- a/pkg/retry/options_test.go +++ /dev/null @@ -1,52 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -//nolint: testpackage -package retry - -import ( - "reflect" - "testing" - "time" -) - -// nolint: scopelint -func TestNewDefaultOptions(t *testing.T) { - type args struct { - setters []Option - } - - tests := []struct { - name string - args args - want *Options - }{ - { - name: "with options", - args: args{ - setters: []Option{WithUnits(time.Millisecond)}, - }, - want: &Options{ - Units: time.Millisecond, - }, - }, - { - name: "default", - args: args{ - setters: []Option{}, - }, - want: &Options{ - Units: time.Second, - }, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := NewDefaultOptions(tt.args.setters...); !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewDefaultOptions() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go deleted file mode 100644 index 5dde3c335..000000000 --- a/pkg/retry/retry.go +++ /dev/null @@ -1,181 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package retry - -import ( - "fmt" - "math/rand" - "sync" - "time" -) - -// RetryableFunc represents a function that can be retried. -type RetryableFunc func() error - -// Retryer defines the requirements for retrying a function. -type Retryer interface { - Retry(RetryableFunc) error -} - -// Ticker defines the requirements for providing a clock to the retry logic. -type Ticker interface { - Tick() time.Duration - StopChan() <-chan struct{} - Stop() -} - -// ErrorSet represents a set of unique errors. -type ErrorSet struct { - errs []error - - mu sync.Mutex -} - -func (e *ErrorSet) Error() string { - if len(e.errs) == 0 { - return "" - } - - errString := fmt.Sprintf("%d error(s) occurred:", len(e.errs)) - for _, err := range e.errs { - errString = fmt.Sprintf("%s\n\t%s", errString, err) - } - - return errString -} - -// Append adds the error to the set if the error is not already present. -func (e *ErrorSet) Append(err error) error { - e.mu.Lock() - defer e.mu.Unlock() - - if e.errs == nil { - e.errs = []error{} - } - - ok := false - - for _, existingErr := range e.errs { - if err.Error() == existingErr.Error() { - ok = true - break - } - } - - if !ok { - e.errs = append(e.errs, err) - } - - return e -} - -// TimeoutError represents a timeout error. -type TimeoutError struct{} - -func (TimeoutError) Error() string { - return "timeout" -} - -// IsTimeout reutrns if the provided error is a timeout error. -func IsTimeout(err error) bool { - _, ok := err.(TimeoutError) - - return ok -} - -type expectedError struct{ error } - -type unexpectedError struct{ error } - -type retryer struct { - duration time.Duration - options *Options -} - -type ticker struct { - C chan time.Time - options *Options - rand *rand.Rand - s chan struct{} -} - -func (t ticker) Jitter() time.Duration { - if int(t.options.Jitter) == 0 { - return time.Duration(0) - } - - if t.rand == nil { - t.rand = rand.New(rand.NewSource(time.Now().UnixNano())) - } - - return time.Duration(t.rand.Int63n(int64(t.options.Jitter))) -} - -func (t ticker) StopChan() <-chan struct{} { - return t.s -} - -func (t ticker) Stop() { - t.s <- struct{}{} -} - -// ExpectedError error represents an error that is expected by the retrying -// function. This error is ignored. -func ExpectedError(err error) error { - if err == nil { - return nil - } - - return expectedError{err} -} - -// UnexpectedError error represents an error that is unexpected by the retrying -// function. This error is fatal. -func UnexpectedError(err error) error { - if err == nil { - return nil - } - - return unexpectedError{err} -} - -func retry(f RetryableFunc, d time.Duration, t Ticker) error { - timer := time.NewTimer(d) - defer timer.Stop() - - errs := &ErrorSet{} - - // We run the func first to avoid having to wait for the next tick. - if err := f(); err != nil { - if _, ok := err.(unexpectedError); ok { - return errs.Append(err) - } - } else { - return nil - } - - for { - select { - case <-timer.C: - return errs.Append(TimeoutError{}) - case <-t.StopChan(): - return nil - case <-time.After(t.Tick()): - } - - if err := f(); err != nil { - switch err.(type) { - case expectedError: - // nolint: errcheck - errs.Append(err) - continue - case unexpectedError: - return errs.Append(err) - } - } - - return nil - } -} diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go deleted file mode 100644 index 29cc43b63..000000000 --- a/pkg/retry/retry_test.go +++ /dev/null @@ -1,63 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -//nolint: testpackage -package retry - -import ( - "errors" - "testing" - "time" -) - -// nolint: scopelint -func Test_retry(t *testing.T) { - type args struct { - f RetryableFunc - d time.Duration - t Ticker - } - - tests := []struct { - name string - args args - wantString string - }{ - { - name: "expected error string", - args: args{ - f: func() error { return ExpectedError(errors.New("test")) }, - d: 2 * time.Second, - t: NewConstantTicker(NewDefaultOptions()), - }, - wantString: "2 error(s) occurred:\n\ttest\n\ttimeout", - }, - { - name: "unexpected error string", - args: args{ - f: func() error { return UnexpectedError(errors.New("test")) }, - d: 2 * time.Second, - t: NewConstantTicker(NewDefaultOptions()), - }, - wantString: "1 error(s) occurred:\n\ttest", - }, - { - name: "no error string", - args: args{ - f: func() error { return nil }, - d: 2 * time.Second, - t: NewConstantTicker(NewDefaultOptions()), - }, - wantString: "", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if err := retry(tt.args.f, tt.args.d, tt.args.t); err != nil && tt.wantString != err.Error() { - t.Errorf("retry() error = %q\nwant:\n%q", err, tt.wantString) - } - }) - } -}