diff --git a/cmd/talosctl/cmd/mgmt/cluster/create.go b/cmd/talosctl/cmd/mgmt/cluster/create.go index 442f34a42..39c258df9 100644 --- a/cmd/talosctl/cmd/mgmt/cluster/create.go +++ b/cmd/talosctl/cmd/mgmt/cluster/create.go @@ -20,6 +20,7 @@ import ( humanize "github.com/dustin/go-humanize" "github.com/siderolabs/go-blockdevice/blockdevice/encryption" + "github.com/siderolabs/go-kubeconfig" "github.com/spf13/cobra" "github.com/spf13/pflag" "github.com/talos-systems/go-procfs/procfs" @@ -30,7 +31,6 @@ import ( "github.com/talos-systems/talos/pkg/cli" "github.com/talos-systems/talos/pkg/cluster/check" "github.com/talos-systems/talos/pkg/images" - "github.com/talos-systems/talos/pkg/kubeconfig" clientconfig "github.com/talos-systems/talos/pkg/machinery/client/config" "github.com/talos-systems/talos/pkg/machinery/config" "github.com/talos-systems/talos/pkg/machinery/config/configpatcher" diff --git a/cmd/talosctl/cmd/talos/kubeconfig.go b/cmd/talosctl/cmd/talos/kubeconfig.go index b5018a38d..638b7eb38 100644 --- a/cmd/talosctl/cmd/talos/kubeconfig.go +++ b/cmd/talosctl/cmd/talos/kubeconfig.go @@ -14,11 +14,11 @@ import ( "sync" "github.com/mattn/go-isatty" + "github.com/siderolabs/go-kubeconfig" "github.com/spf13/cobra" "k8s.io/client-go/tools/clientcmd" "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" - "github.com/talos-systems/talos/pkg/kubeconfig" "github.com/talos-systems/talos/pkg/machinery/client" ) diff --git a/cmd/talosctl/pkg/talos/action/node.go b/cmd/talosctl/pkg/talos/action/node.go index 0bb976166..4d3767a92 100644 --- a/cmd/talosctl/pkg/talos/action/node.go +++ b/cmd/talosctl/pkg/talos/action/node.go @@ -11,12 +11,12 @@ import ( "io" "strings" + "github.com/siderolabs/go-circular" "github.com/talos-systems/go-retry/retry" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" - "github.com/talos-systems/talos/pkg/circular" "github.com/talos-systems/talos/pkg/machinery/api/common" machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" "github.com/talos-systems/talos/pkg/machinery/client" diff --git a/cmd/talosctl/pkg/talos/action/tracker.go b/cmd/talosctl/pkg/talos/action/tracker.go index 00c96e929..2a5240680 100644 --- a/cmd/talosctl/pkg/talos/action/tracker.go +++ b/cmd/talosctl/pkg/talos/action/tracker.go @@ -16,13 +16,13 @@ import ( "github.com/mattn/go-isatty" "github.com/siderolabs/gen/maps" + "github.com/siderolabs/go-circular" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/backoff" "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/global" "github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers" - "github.com/talos-systems/talos/pkg/circular" machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine" "github.com/talos-systems/talos/pkg/machinery/client" "github.com/talos-systems/talos/pkg/reporter" diff --git a/go.mod b/go.mod index b9e43086a..4d163be4b 100644 --- a/go.mod +++ b/go.mod @@ -92,15 +92,18 @@ require ( github.com/siderolabs/discovery-client v0.1.2 github.com/siderolabs/gen v0.4.0 github.com/siderolabs/go-blockdevice v0.4.0 + github.com/siderolabs/go-circular v0.1.0 + github.com/siderolabs/go-kubeconfig v0.1.0 github.com/siderolabs/go-loadbalancer v0.2.0 github.com/siderolabs/go-pcidb v0.1.0 github.com/siderolabs/go-pointer v1.0.0 github.com/siderolabs/go-smbios v0.3.1 + github.com/siderolabs/go-tail v0.1.0 github.com/siderolabs/grpc-proxy v0.4.0 github.com/siderolabs/siderolink v0.2.0 github.com/spf13/cobra v1.6.0 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.8.0 + github.com/stretchr/testify v1.8.1 github.com/talos-systems/go-cmd v0.1.0 github.com/talos-systems/go-debug v0.2.1 github.com/talos-systems/go-kmsg v0.1.1 diff --git a/go.sum b/go.sum index b081e5dde..4ac8d9501 100644 --- a/go.sum +++ b/go.sum @@ -1062,6 +1062,10 @@ github.com/siderolabs/go-api-signature v0.1.0 h1:8tDoOUg5Ns1DMv+7dGy8s8mKeKygZMJ github.com/siderolabs/go-api-signature v0.1.0/go.mod h1:9jSRiJsuKyf6b/hyKcBgCZXvu7xGJ+RiCQQAVraGdN0= github.com/siderolabs/go-blockdevice v0.4.0 h1:b+5b4Lu35U6MFXQDE2g0hAW6pTqkJypPF8WbktbbXlQ= github.com/siderolabs/go-blockdevice v0.4.0/go.mod h1:viu/gYeRpdr41UcR39SjlFr4vID4WyaIuGd8lOKuyA4= +github.com/siderolabs/go-circular v0.1.0 h1:zpBJNUbCZSh0odZxA4Dcj0d3ShLLR2WxKW6hTdAtoiE= +github.com/siderolabs/go-circular v0.1.0/go.mod h1:14XnLf/I3J0VjzTgmwWNGjp58/bdIi4zXppAEx8plfw= +github.com/siderolabs/go-kubeconfig v0.1.0 h1:t/2oMWkLSdWHXglKPMz8ySXnx6ZjHckeGY79NaDcBTo= +github.com/siderolabs/go-kubeconfig v0.1.0/go.mod h1:eM3mO02Td6wYDvdi9zTbMrj1Q4WqEFN8XQ6pNjCUWkI= github.com/siderolabs/go-loadbalancer v0.2.0 h1:Uv7nJqIwqtRVgUShsQwHmThfJTugpDhmizIvYwa/Rdk= github.com/siderolabs/go-loadbalancer v0.2.0/go.mod h1:pWeG20ljyAL4PTJehpDnkoEHUr8ZL/6C/zpfgPtbe70= github.com/siderolabs/go-pcidb v0.1.0 h1:6cJPBBmHlIF4GouYR/1g3JXS/niAON+6lIOfKl/t794= @@ -1070,6 +1074,8 @@ github.com/siderolabs/go-pointer v1.0.0 h1:6TshPKep2doDQJAAtHUuHWXbca8ZfyRySjSBT github.com/siderolabs/go-pointer v1.0.0/go.mod h1:HTRFUNYa3R+k0FFKNv11zgkaCLzEkWVzoYZ433P3kHc= github.com/siderolabs/go-smbios v0.3.1 h1:lS7axBwd485Gevb20a8QSimBh2OoP88P2SgA2mSHumA= github.com/siderolabs/go-smbios v0.3.1/go.mod h1:Yyyj/yY9qrYTXZnQViyhGuCV4b3IRSZNj3sOXrLvl/c= +github.com/siderolabs/go-tail v0.1.0 h1:U+ZClt7BXLGsxDNU/XQ12sz7lQElfFZBYEPdkW78Qro= +github.com/siderolabs/go-tail v0.1.0/go.mod h1:vWxumnRUS3eTZczORCJW3QMjxiTETN31vyuFdaW8rPw= github.com/siderolabs/grpc-proxy v0.4.0 h1:zYrhqLYs8JlYoLHYeel7/XwXDZ4OJ5XyP9wX7JlbPew= github.com/siderolabs/grpc-proxy v0.4.0/go.mod h1:QDurYOwQD4H8BKyvCuUxMiuG/etYnb/++xaQB644NdU= github.com/siderolabs/protoenc v0.2.0 h1:QFxWIAo//12+/bm27GNYoK/TpQGTYsRrrZCu9jSghvU= @@ -1125,8 +1131,9 @@ github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -1136,8 +1143,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= diff --git a/internal/app/machined/pkg/runtime/logging/circular.go b/internal/app/machined/pkg/runtime/logging/circular.go index 47b993c4f..f402a5472 100644 --- a/internal/app/machined/pkg/runtime/logging/circular.go +++ b/internal/app/machined/pkg/runtime/logging/circular.go @@ -15,11 +15,11 @@ import ( "sync" "time" + "github.com/siderolabs/go-circular" + "github.com/siderolabs/go-tail" "github.com/talos-systems/go-debug" "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" - "github.com/talos-systems/talos/pkg/circular" - "github.com/talos-systems/talos/pkg/tail" ) // These constants should some day move to config. diff --git a/internal/app/machined/pkg/runtime/logging/file.go b/internal/app/machined/pkg/runtime/logging/file.go index 7bd4940ee..58166c456 100644 --- a/internal/app/machined/pkg/runtime/logging/file.go +++ b/internal/app/machined/pkg/runtime/logging/file.go @@ -12,9 +12,10 @@ import ( "path/filepath" "strings" + "github.com/siderolabs/go-tail" + "github.com/talos-systems/talos/internal/app/machined/pkg/runtime" "github.com/talos-systems/talos/pkg/follow" - "github.com/talos-systems/talos/pkg/tail" ) // FileLoggingManager implements simple logging to files. diff --git a/internal/pkg/containers/container.go b/internal/pkg/containers/container.go index 611246d8e..77c88facd 100644 --- a/internal/pkg/containers/container.go +++ b/internal/pkg/containers/container.go @@ -14,10 +14,11 @@ import ( "strings" "syscall" + "github.com/siderolabs/go-tail" + "github.com/talos-systems/talos/pkg/chunker" "github.com/talos-systems/talos/pkg/chunker/file" "github.com/talos-systems/talos/pkg/chunker/stream" - "github.com/talos-systems/talos/pkg/tail" ) // Container presents information about a container. diff --git a/pkg/circular/circular.go b/pkg/circular/circular.go deleted file mode 100644 index 1f2d65939..000000000 --- a/pkg/circular/circular.go +++ /dev/null @@ -1,160 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -// Package circular provides a buffer with circular semantics. -package circular - -import ( - "fmt" - "sync" -) - -// Buffer implements circular buffer with a thread-safe writer, -// that supports multiple readers each with its own offset. -type Buffer struct { - opt Options - - // synchronizing access to data, off - mu sync.Mutex - cond *sync.Cond - - // data slice, might grow up to MaxCapacity, then used - // as circular buffer - data []byte - - // write offset, always goes up, actual offset in data slice - // is (off % cap(data)) - off int64 -} - -// NewBuffer creates new Buffer with specified options. -func NewBuffer(opts ...OptionFunc) (*Buffer, error) { - buf := &Buffer{ - opt: defaultOptions(), - } - - for _, o := range opts { - if err := o(&buf.opt); err != nil { - return nil, err - } - } - - if buf.opt.InitialCapacity > buf.opt.MaxCapacity { - return nil, fmt.Errorf("initial capacity (%d) should be less or equal to max capacity (%d)", buf.opt.InitialCapacity, buf.opt.MaxCapacity) - } - - if buf.opt.SafetyGap >= buf.opt.MaxCapacity { - return nil, fmt.Errorf("safety gap (%d) should be less than max capacity (%d)", buf.opt.SafetyGap, buf.opt.MaxCapacity) - } - - buf.data = make([]byte, buf.opt.InitialCapacity) - buf.cond = sync.NewCond(&buf.mu) - - return buf, nil -} - -// Write implements io.Writer interface. -func (buf *Buffer) Write(p []byte) (int, error) { - l := len(p) - if l == 0 { - return 0, nil - } - - buf.mu.Lock() - defer buf.mu.Unlock() - - if buf.off < int64(buf.opt.MaxCapacity) { - if buf.off+int64(l) > int64(cap(buf.data)) && cap(buf.data) < buf.opt.MaxCapacity { - // grow buffer to ensure write fits, but limit with max capacity - size := cap(buf.data) * 2 - for size < int(buf.off)+l { - size *= 2 - } - - if size > buf.opt.MaxCapacity { - size = buf.opt.MaxCapacity - } - - data := make([]byte, size) - copy(data, buf.data) - buf.data = data - } - } - - var n int - for n < l { - i := int(buf.off % int64(buf.opt.MaxCapacity)) - - nn := buf.opt.MaxCapacity - i - if nn > len(p) { - nn = len(p) - } - - copy(buf.data[i:], p[:nn]) - - buf.off += int64(nn) - n += nn - p = p[nn:] - } - - buf.cond.Broadcast() - - return n, nil -} - -// Capacity returns number of bytes allocated for the buffer. -func (buf *Buffer) Capacity() int { - buf.mu.Lock() - defer buf.mu.Unlock() - - return cap(buf.data) -} - -// Offset returns current write offset (number of bytes written). -func (buf *Buffer) Offset() int64 { - buf.mu.Lock() - defer buf.mu.Unlock() - - return buf.off -} - -// GetStreamingReader returns StreamingReader object which implements io.ReadCloser, io.Seeker. -// -// StreamingReader starts at the most distant position in the past available. -func (buf *Buffer) GetStreamingReader() *StreamingReader { - buf.mu.Lock() - defer buf.mu.Unlock() - - off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap) - if off < 0 { - off = 0 - } - - return &StreamingReader{ - buf: buf, - initialOff: off, - off: off, - } -} - -// GetReader returns Reader object which implements io.ReadCloser, io.Seeker. -// -// Reader starts at the most distant position in the past available and goes -// to the current write position. -func (buf *Buffer) GetReader() *Reader { - buf.mu.Lock() - defer buf.mu.Unlock() - - off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap) - if off < 0 { - off = 0 - } - - return &Reader{ - buf: buf, - startOff: off, - endOff: buf.off, - off: off, - } -} diff --git a/pkg/circular/circular_test.go b/pkg/circular/circular_test.go deleted file mode 100644 index 3af21db80..000000000 --- a/pkg/circular/circular_test.go +++ /dev/null @@ -1,423 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package circular_test - -import ( - "bytes" - "context" - "io" - "math/rand" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/suite" - "golang.org/x/time/rate" - - "github.com/talos-systems/talos/pkg/circular" -) - -type CircularSuite struct { - suite.Suite -} - -func (suite *CircularSuite) TestWrites() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(100000)) - suite.Require().NoError(err) - - n, err := buf.Write(nil) - suite.Require().NoError(err) - suite.Require().Equal(0, n) - - n, err = buf.Write(make([]byte, 100)) - suite.Require().NoError(err) - suite.Require().Equal(100, n) - - n, err = buf.Write(make([]byte, 1000)) - suite.Require().NoError(err) - suite.Require().Equal(1000, n) - - suite.Require().Equal(2048, buf.Capacity()) - suite.Require().EqualValues(1100, buf.Offset()) - - n, err = buf.Write(make([]byte, 5000)) - suite.Require().NoError(err) - suite.Require().Equal(5000, n) - - suite.Require().Equal(8192, buf.Capacity()) - suite.Require().EqualValues(6100, buf.Offset()) - - for i := 0; i < 20; i++ { - l := 1 << i - - n, err = buf.Write(make([]byte, l)) - suite.Require().NoError(err) - suite.Require().Equal(l, n) - } - - suite.Require().Equal(100000, buf.Capacity()) - suite.Require().EqualValues(6100+(1<<20)-1, buf.Offset()) -} - -func (suite *CircularSuite) TestStreamingReadWriter() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536)) - suite.Require().NoError(err) - - r := buf.GetStreamingReader() - - size := 1048576 - - data := make([]byte, size) - for i := range data { - data[i] = byte(rand.Int31()) - } - - var wg sync.WaitGroup - defer wg.Wait() - - wg.Add(1) - - go func() { - defer wg.Done() - - p := data - - r := rate.NewLimiter(300_000, 1000) - - for i := 0; i < len(data); { - l := 100 + rand.Intn(100) - - if i+l > len(data) { - l = len(data) - i - } - - r.WaitN(context.Background(), l) //nolint:errcheck - - n, e := buf.Write(p[:l]) - suite.Require().NoError(e) - suite.Require().Equal(l, n) - - i += l - p = p[l:] - } - }() - - actual := make([]byte, size) - - n, err := io.ReadFull(r, actual) - suite.Require().NoError(err) - suite.Require().Equal(size, n) - - suite.Require().Equal(data, actual) - - s := make(chan struct{}) - - go func() { - _, err = r.Read(make([]byte, 1)) - - suite.Assert().Equal(err, circular.ErrClosed) - - close(s) - }() - - time.Sleep(50 * time.Millisecond) // wait for the goroutine to start - - suite.Require().NoError(r.Close()) - - // close should abort reader - <-s - - _, err = r.Read(nil) - suite.Require().Equal(circular.ErrClosed, err) -} - -func (suite *CircularSuite) TestStreamingMultipleReaders() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536)) - suite.Require().NoError(err) - - n := 5 - - readers := make([]*circular.StreamingReader, n) - - for i := 0; i < n; i++ { - readers[i] = buf.GetStreamingReader() - } - - size := 1048576 - - data := make([]byte, size) - for i := range data { - data[i] = byte(rand.Int31()) - } - - var wg sync.WaitGroup - defer wg.Wait() - - for i := 0; i < n; i++ { - wg.Add(1) - - i := i - - go func() { - defer wg.Done() - - actual := make([]byte, size) - - nn, err := io.ReadFull(readers[i], actual) - suite.Require().NoError(err) - suite.Assert().Equal(size, nn) - - suite.Assert().Equal(data, actual) - }() - } - - p := data - - r := rate.NewLimiter(300_000, 1000) - - for i := 0; i < len(data); { - l := 256 - - if i+l > len(data) { - l = len(data) - i - } - - r.WaitN(context.Background(), l) //nolint:errcheck - - n, e := buf.Write(p[:l]) - suite.Require().NoError(e) - suite.Require().Equal(l, n) - - i += l - p = p[l:] - } -} - -func (suite *CircularSuite) TestStreamingLateAndIdleReaders() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) - - idleR := buf.GetStreamingReader() - - size := 100000 - - data := make([]byte, size) - for i := range data { - data[i] = byte(rand.Int31()) - } - - n, err := buf.Write(data) - suite.Require().NoError(err) - suite.Require().Equal(size, n) - - lateR := buf.GetStreamingReader() - - go func() { - time.Sleep(50 * time.Millisecond) - - suite.Require().NoError(lateR.Close()) - }() - - actual, err := io.ReadAll(lateR) - suite.Require().Equal(circular.ErrClosed, err) - suite.Require().Equal(65536-256, len(actual)) - - suite.Require().Equal(data[size-65536+256:], actual) - - go func() { - time.Sleep(50 * time.Millisecond) - - suite.Require().NoError(idleR.Close()) - }() - - actual, err = io.ReadAll(idleR) - suite.Require().Equal(circular.ErrClosed, err) - suite.Require().Equal(65536, len(actual)) - - suite.Require().Equal(data[size-65536:], actual) -} - -func (suite *CircularSuite) TestStreamingSeek() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) - - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) - - r := buf.GetStreamingReader() - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) - suite.Require().NoError(err) - - off, err := r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(0, off) - - data := make([]byte, 256) - - n, err := r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xff}, 256), data) - - off, err = r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(256, off) - - off, err = r.Seek(-256, io.SeekEnd) - suite.Require().NoError(err) - suite.Assert().EqualValues(768, off) - - n, err = r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xfe}, 256), data) - - off, err = r.Seek(2048, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(1024, off) - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) - suite.Require().NoError(err) - - off, err = r.Seek(0, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(1024, off) - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 1024)) - suite.Require().NoError(err) - - off, err = r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(2048, off) - - _, err = r.Seek(-100, io.SeekStart) - suite.Require().Equal(circular.ErrSeekBeforeStart, err) -} - -func (suite *CircularSuite) TestRegularReaderEmpty() { - buf, err := circular.NewBuffer() - suite.Require().NoError(err) - - n, err := buf.GetReader().Read(nil) - suite.Require().Equal(0, n) - suite.Require().Equal(io.EOF, err) -} - -func (suite *CircularSuite) TestRegularReader() { - buf, err := circular.NewBuffer() - suite.Require().NoError(err) - - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) - - r := buf.GetReader() - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) - suite.Require().NoError(err) - - actual, err := io.ReadAll(r) - suite.Require().NoError(err) - suite.Require().Equal(bytes.Repeat([]byte{0xff}, 512), actual) -} - -func (suite *CircularSuite) TestRegularReaderOutOfSync() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) - - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) - - r := buf.GetReader() - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) - suite.Require().NoError(err) - - _, err = r.Read(nil) - suite.Require().Equal(err, circular.ErrOutOfSync) -} - -func (suite *CircularSuite) TestRegularReaderFull() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(4096), circular.WithSafetyGap(256)) - suite.Require().NoError(err) - - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 6146)) - suite.Require().NoError(err) - - r := buf.GetReader() - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 100)) - suite.Require().NoError(err) - - actual, err := io.ReadAll(r) - suite.Require().NoError(err) - suite.Require().Equal(bytes.Repeat([]byte{0xff}, 4096-256), actual) - - suite.Require().NoError(r.Close()) - - _, err = r.Read(nil) - suite.Require().Equal(err, circular.ErrClosed) -} - -func (suite *CircularSuite) TestRegularSeek() { - buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256)) - suite.Require().NoError(err) - - _, err = buf.Write(bytes.Repeat([]byte{0xff}, 512)) - suite.Require().NoError(err) - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512)) - suite.Require().NoError(err) - - r := buf.GetReader() - - _, err = buf.Write(bytes.Repeat([]byte{0xfc}, 512)) - suite.Require().NoError(err) - - off, err := r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(0, off) - - data := make([]byte, 256) - - n, err := r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xff}, 256), data) - - off, err = r.Seek(0, io.SeekCurrent) - suite.Require().NoError(err) - suite.Assert().EqualValues(256, off) - - off, err = r.Seek(-256, io.SeekEnd) - suite.Require().NoError(err) - suite.Assert().EqualValues(768, off) - - n, err = r.Read(data) - suite.Require().NoError(err) - suite.Assert().Equal(256, n) - suite.Assert().Equal(bytes.Repeat([]byte{0xfe}, 256), data) - - off, err = r.Seek(2048, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(1024, off) - - _, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256)) - suite.Require().NoError(err) - - off, err = r.Seek(0, io.SeekStart) - suite.Require().NoError(err) - suite.Assert().EqualValues(0, off) - - _, err = r.Seek(-100, io.SeekStart) - suite.Require().Equal(circular.ErrSeekBeforeStart, err) - - _, err = r.Read(nil) - suite.Require().Equal(circular.ErrOutOfSync, err) -} - -func TestCircularSuite(t *testing.T) { - suite.Run(t, new(CircularSuite)) -} diff --git a/pkg/circular/errors.go b/pkg/circular/errors.go deleted file mode 100644 index 79ab41a15..000000000 --- a/pkg/circular/errors.go +++ /dev/null @@ -1,16 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package circular - -import "errors" - -// ErrClosed is raised on read from closed Reader. -var ErrClosed = errors.New("reader is closed") - -// ErrSeekBeforeStart is raised when seek goes beyond start of the file. -var ErrSeekBeforeStart = errors.New("seek before start") - -// ErrOutOfSync is raised when reader got too much out of sync with the writer. -var ErrOutOfSync = errors.New("buffer overrun, read position overwritten") diff --git a/pkg/circular/options.go b/pkg/circular/options.go deleted file mode 100644 index 56281fa65..000000000 --- a/pkg/circular/options.go +++ /dev/null @@ -1,69 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package circular - -import "fmt" - -// Options defines settings for Buffer. -type Options struct { - InitialCapacity int - MaxCapacity int - SafetyGap int -} - -// defaultOptions returns default initial values. -func defaultOptions() Options { - return Options{ - InitialCapacity: 16384, - MaxCapacity: 1048576, - SafetyGap: 1024, - } -} - -// OptionFunc allows setting Buffer options. -type OptionFunc func(*Options) error - -// WithInitialCapacity sets initial buffer capacity. -func WithInitialCapacity(capacity int) OptionFunc { - return func(opt *Options) error { - if capacity <= 0 { - return fmt.Errorf("initial capacity should be positive: %d", capacity) - } - - opt.InitialCapacity = capacity - - return nil - } -} - -// WithMaxCapacity sets maximum buffer capacity. -func WithMaxCapacity(capacity int) OptionFunc { - return func(opt *Options) error { - if capacity <= 0 { - return fmt.Errorf("max capacity should be positive: %d", capacity) - } - - opt.MaxCapacity = capacity - - return nil - } -} - -// WithSafetyGap sets safety gap between readers and writers to avoid buffer overrun for the reader. -// -// Reader initial position is set to be as far as possible in the buffer history, but next concurrent write -// might overwrite read position, and safety gap helps to prevent it. With safety gap, maximum available -// bytes to read are: MaxCapacity-SafetyGap. -func WithSafetyGap(gap int) OptionFunc { - return func(opt *Options) error { - if gap <= 0 { - return fmt.Errorf("safety gap should be positive: %q", gap) - } - - opt.SafetyGap = gap - - return nil - } -} diff --git a/pkg/circular/reader.go b/pkg/circular/reader.go deleted file mode 100644 index 9531e4b1f..000000000 --- a/pkg/circular/reader.go +++ /dev/null @@ -1,103 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package circular - -import ( - "io" - "sync/atomic" -) - -// Reader implements seekable reader with local position in the Buffer which -// reads from the fixed part of the buffer. -// -// Reader is not safe to be used with concurrent Read/Seek operations. -type Reader struct { - buf *Buffer - - startOff, endOff int64 - off int64 - - closed uint32 -} - -// Read implements io.Reader. -func (r *Reader) Read(p []byte) (n int, err error) { - if atomic.LoadUint32(&r.closed) > 0 { - err = ErrClosed - - return - } - - r.buf.mu.Lock() - defer r.buf.mu.Unlock() - - if r.off < r.buf.off-int64(r.buf.opt.MaxCapacity) { - // reader is falling too much behind - err = ErrOutOfSync - - return - } - - if r.off == r.endOff { - err = io.EOF - - return - } - - if len(p) == 0 { - return - } - - n = int(r.endOff - r.off) - if n > len(p) { - n = len(p) - } - - i := int(r.off % int64(r.buf.opt.MaxCapacity)) - - if l := r.buf.opt.MaxCapacity - i; l < n { - copy(p, r.buf.data[i:]) - copy(p[l:], r.buf.data[:n-l]) - } else { - copy(p, r.buf.data[i:i+n]) - } - - r.off += int64(n) - - return n, err -} - -// Close implements io.Closer. -func (r *Reader) Close() error { - atomic.StoreUint32(&r.closed, 1) - - return nil -} - -// Seek implements io.Seeker. -func (r *Reader) Seek(offset int64, whence int) (int64, error) { - newOff := r.off - - switch whence { - case io.SeekCurrent: - newOff += offset - case io.SeekEnd: - newOff = r.endOff + offset - case io.SeekStart: - newOff = r.startOff + offset - } - - if newOff < r.startOff { - return r.off - r.startOff, ErrSeekBeforeStart - } - - if newOff > r.endOff { - newOff = r.endOff - } - - r.off = newOff - - return r.off - r.startOff, nil -} diff --git a/pkg/circular/streaming.go b/pkg/circular/streaming.go deleted file mode 100644 index de8ff5ae1..000000000 --- a/pkg/circular/streaming.go +++ /dev/null @@ -1,117 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package circular - -import ( - "io" - "sync/atomic" -) - -// StreamingReader implements seekable reader with local position in the Buffer. -// -// StreamingReader is not safe to be used with concurrent Read/Seek operations. -// -// StreamingReader blocks for new data once it exhausts contents of the buffer. -type StreamingReader struct { - buf *Buffer - - initialOff int64 - off int64 - - closed uint32 -} - -// Read implements io.Reader. -func (r *StreamingReader) Read(p []byte) (n int, err error) { - if atomic.LoadUint32(&r.closed) > 0 { - err = ErrClosed - - return - } - - if len(p) == 0 { - return - } - - r.buf.mu.Lock() - defer r.buf.mu.Unlock() - - if r.off < r.buf.off-int64(r.buf.opt.MaxCapacity) { - // reader is falling too much behind, so need to rewind to the first available position - r.off = r.buf.off - int64(r.buf.opt.MaxCapacity) - } - - for r.off == r.buf.off { - r.buf.cond.Wait() - - if atomic.LoadUint32(&r.closed) > 0 { - err = ErrClosed - - return - } - } - - n = int(r.buf.off - r.off) - if n > len(p) { - n = len(p) - } - - i := int(r.off % int64(r.buf.opt.MaxCapacity)) - - if l := r.buf.opt.MaxCapacity - i; l < n { - copy(p, r.buf.data[i:]) - copy(p[l:], r.buf.data[:n-l]) - } else { - copy(p, r.buf.data[i:i+n]) - } - - r.off += int64(n) - - return n, err -} - -// Close implements io.Closer. -func (r *StreamingReader) Close() error { - if atomic.CompareAndSwapUint32(&r.closed, 0, 1) { - // wake up readers - r.buf.cond.Broadcast() - } - - return nil -} - -// Seek implements io.Seeker. -func (r *StreamingReader) Seek(offset int64, whence int) (int64, error) { - newOff := r.off - - r.buf.mu.Lock() - writeOff := r.buf.off - r.buf.mu.Unlock() - - switch whence { - case io.SeekCurrent: - newOff += offset - case io.SeekEnd: - newOff = writeOff + offset - case io.SeekStart: - newOff = r.initialOff + offset - } - - if newOff < r.initialOff { - return r.off - r.initialOff, ErrSeekBeforeStart - } - - if newOff > writeOff { - newOff = writeOff - } - - if newOff < writeOff-int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap) { - newOff = writeOff - int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap) - } - - r.off = newOff - - return r.off - r.initialOff, nil -} diff --git a/pkg/grpc/middleware/authz/metadata.go b/pkg/grpc/middleware/authz/metadata.go index 6d805561b..00ec62373 100644 --- a/pkg/grpc/middleware/authz/metadata.go +++ b/pkg/grpc/middleware/authz/metadata.go @@ -9,12 +9,13 @@ import ( "google.golang.org/grpc/metadata" + "github.com/talos-systems/talos/pkg/machinery/constants" "github.com/talos-systems/talos/pkg/machinery/role" ) // mdKey is used to store roles in gRPC metadata. // Should be used only in this file. -const mdKey = "talos-role" +const mdKey = constants.APIAuthzRoleMetadataKey // SetMetadata sets given roles in gRPC metadata. func SetMetadata(md metadata.MD, roles role.Set) { diff --git a/pkg/kubeconfig/kubeconfig.go b/pkg/kubeconfig/kubeconfig.go index c7e97876a..e51a083b7 100644 --- a/pkg/kubeconfig/kubeconfig.go +++ b/pkg/kubeconfig/kubeconfig.go @@ -2,35 +2,5 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at http://mozilla.org/MPL/2.0/. -// Package kubeconfig provides Kubernetes config file handling. +// Package kubeconfig provides Kubernetes config file generation. package kubeconfig - -import ( - "fmt" - "os" - "path/filepath" -) - -// DefaultPath returns path to ~/.kube/config. -func DefaultPath() (string, error) { - home, err := os.UserHomeDir() - if err != nil { - return "", err - } - - return filepath.Join(home, ".kube/config"), nil -} - -// SinglePath parses KUBECONFIG and the default kubeconfig file location -// and ensures there is only one to return. -func SinglePath() (string, error) { - envVarFilePaths := filepath.SplitList(os.Getenv("KUBECONFIG")) - switch len(envVarFilePaths) { - case 0: - return DefaultPath() - case 1: - return envVarFilePaths[0], nil - default: - return "", fmt.Errorf("multiple kubeconfig files defined") - } -} diff --git a/pkg/kubeconfig/kubeconfig_test.go b/pkg/kubeconfig/kubeconfig_test.go deleted file mode 100644 index bbfc66988..000000000 --- a/pkg/kubeconfig/kubeconfig_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package kubeconfig_test - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/talos-systems/talos/pkg/kubeconfig" -) - -func TestSinglePath(t *testing.T) { - expectedDefaultPath, err := kubeconfig.DefaultPath() - assert.NoError(t, err) - - for _, tt := range []struct { - name string - envVar string - shouldErr bool - expected string - }{ - { - name: "NoKUBECONFIGSet", - shouldErr: false, - expected: expectedDefaultPath, - }, - { - name: "UseKUBECONFIGSet", - envVar: "/my/custom/path/to/kubeconfig", - shouldErr: false, - expected: "/my/custom/path/to/kubeconfig", - }, - { - name: "MultiKUBECONFIGSet", - envVar: "/my/custom/path/to/kubeconfig:/another/path/to/kubeconfig:/foo/bar/kubeconfig", - shouldErr: true, - }, - } { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - if tt.envVar != "" { - t.Setenv("KUBECONFIG", tt.envVar) - } - result, err := kubeconfig.SinglePath() - - if tt.shouldErr { - require.Error(t, err) - } else { - require.NoError(t, err) - } - - assert.Equal(t, tt.expected, result) - }) - } -} diff --git a/pkg/kubeconfig/merge.go b/pkg/kubeconfig/merge.go deleted file mode 100644 index 4845b79aa..000000000 --- a/pkg/kubeconfig/merge.go +++ /dev/null @@ -1,225 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package kubeconfig - -import ( - "fmt" - "io" - "reflect" - "strings" - - "k8s.io/client-go/tools/clientcmd" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" -) - -// Merger handles merging of Kubernetes client config files. -type Merger clientcmdapi.Config - -// Load the kubeconfig from file. -func Load(path string) (*Merger, error) { - config, err := clientcmd.LoadFromFile(path) - if err != nil { - return nil, err - } - - return (*Merger)(config), err -} - -// MergeOptions controls Merge process. -type MergeOptions struct { - ForceContextName string - ActivateContext bool - ConflictHandler func(ConfigComponent, string) (ConflictDecision, error) - OutputWriter io.Writer -} - -// ConfigComponent identifies part of kubeconfig. -type ConfigComponent string - -// Kubeconfig components. -const ( - Cluster ConfigComponent = "cluster" - AuthInfo ConfigComponent = "auth" - Context ConfigComponent = "context" -) - -// ConflictDecision is returned from ConflictHandler. -type ConflictDecision string - -// Conflict decisions. -const ( - OverwriteDecision ConflictDecision = "overwrite" - RenameDecision ConflictDecision = "rename" -) - -// Merge the provided kubernetes config in. -// -//nolint:gocyclo,cyclop -func (merger *Merger) Merge(config *clientcmdapi.Config, options MergeOptions) error { - mappedClusters := map[string]string{} - mappedAuthInfos := map[string]string{} - mappedContexts := map[string]string{} - - for name, newCluster := range config.Clusters { - mergedName := name - - oldCluster, exists := merger.Clusters[mergedName] - - newCluster.LocationOfOrigin = "" - - if oldCluster != nil { - oldCluster.LocationOfOrigin = "" - } - - if exists && !reflect.DeepEqual(oldCluster, newCluster) { - decision, err := options.ConflictHandler(Cluster, name) - if err != nil { - return err - } - - if decision == RenameDecision { - mergedName = merger.rename(Cluster, mergedName) - } - } - - mappedClusters[name] = mergedName - } - - for name, newAuthInfo := range config.AuthInfos { - mergedName := name - - // apply previous mappings done to cluster names - for oldName, newName := range mappedClusters { - mergedName = strings.ReplaceAll(mergedName, oldName, newName) - } - - oldAuthInfo, exists := merger.AuthInfos[mergedName] - - newAuthInfo.LocationOfOrigin = "" - - if oldAuthInfo != nil { - oldAuthInfo.LocationOfOrigin = "" - } - - if exists && !reflect.DeepEqual(oldAuthInfo, newAuthInfo) { - decision, err := options.ConflictHandler(AuthInfo, name) - if err != nil { - return err - } - - if decision == RenameDecision { - mergedName = merger.rename(AuthInfo, mergedName) - } - } - - mappedAuthInfos[name] = mergedName - } - - for name, newContext := range config.Contexts { - mergedName := name - - // apply mappings done to authInfo, as authInfo has same format as context in Talos - for oldName, newName := range mappedAuthInfos { - mergedName = strings.ReplaceAll(mergedName, oldName, newName) - } - - if options.ForceContextName != "" { - mergedName = options.ForceContextName - } - - oldContext, exists := merger.Clusters[mergedName] - - newContext.LocationOfOrigin = "" - - if oldContext != nil { - oldContext.LocationOfOrigin = "" - } - - if exists && !reflect.DeepEqual(oldContext, newContext) { - decision, err := options.ConflictHandler(Cluster, name) - if err != nil { - return err - } - - if decision == RenameDecision { - mergedName = merger.rename(Cluster, mergedName) - } - } - - mappedContexts[name] = mergedName - } - - for name, cluster := range config.Clusters { - newName := mappedClusters[name] - - if newName != name { - fmt.Fprintf(options.OutputWriter, "renamed cluster %q -> %q\n", name, newName) - } - - merger.Clusters[newName] = cluster - } - - for name, authInfo := range config.AuthInfos { - newName := mappedAuthInfos[name] - - if newName != name { - fmt.Fprintf(options.OutputWriter, "renamed auth info %q -> %q\n", name, newName) - } - - merger.AuthInfos[newName] = authInfo - } - - for name, context := range config.Contexts { - contextCopy := *context - - newName := mappedContexts[name] - - if newName != name { - fmt.Fprintf(options.OutputWriter, "renamed context %q -> %q\n", name, newName) - } - - contextCopy.AuthInfo = mappedAuthInfos[contextCopy.AuthInfo] - contextCopy.Cluster = mappedClusters[contextCopy.Cluster] - - merger.Contexts[newName] = &contextCopy - - if options.ActivateContext { - merger.CurrentContext = newName - } - } - - return nil -} - -// rename the config component until it gets unique. -func (merger *Merger) rename(component ConfigComponent, name string) (newName string) { - i := 0 - newName = name - - for { - var exists bool - - switch component { - case Cluster: - _, exists = merger.Clusters[newName] - case AuthInfo: - _, exists = merger.AuthInfos[newName] - case Context: - _, exists = merger.Contexts[newName] - } - - if !exists { - return newName - } - - i++ - newName = fmt.Sprintf("%s-%d", name, i) - } -} - -// Write the kubeconfig back to the file. -func (merger *Merger) Write(path string) error { - return clientcmd.WriteToFile(clientcmdapi.Config(*merger), path) -} diff --git a/pkg/kubeconfig/merge_test.go b/pkg/kubeconfig/merge_test.go deleted file mode 100644 index 5dee0a02d..000000000 --- a/pkg/kubeconfig/merge_test.go +++ /dev/null @@ -1,375 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package kubeconfig_test - -import ( - "fmt" - "os" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - - "github.com/talos-systems/talos/pkg/kubeconfig" -) - -func TestMerger(t *testing.T) { - errorAlways := func(kubeconfig.ConfigComponent, string) (kubeconfig.ConflictDecision, error) { - return "", fmt.Errorf("shouldn't be here") - } - renameAlways := func(kubeconfig.ConfigComponent, string) (kubeconfig.ConflictDecision, error) { - return kubeconfig.RenameDecision, nil - } - overwriteAlways := func(kubeconfig.ConfigComponent, string) (kubeconfig.ConflictDecision, error) { - return kubeconfig.OverwriteDecision, nil - } - - for _, tt := range []struct { - name string - initial clientcmdapi.Config - new clientcmdapi.Config - expected clientcmdapi.Config - options kubeconfig.MergeOptions - }{ - { // MergeIntoEmpty - name: "MergeIntoEmpty", - initial: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{}, - Clusters: map[string]*clientcmdapi.Cluster{}, - Contexts: map[string]*clientcmdapi.Context{}, - }, - new: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "nothing", - }, - expected: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - }, - options: kubeconfig.MergeOptions{ - ConflictHandler: errorAlways, - OutputWriter: os.Stdout, - }, - }, - { // MergeClean - name: "MergeClean", - initial: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "foo@bar", - }, - new: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "fiz@buzz": { - ClientCertificate: "cert2", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "buzz": { - Server: "another.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "fiz@buzz": { - Cluster: "buzz", - AuthInfo: "fiz@buzz", - }, - }, - }, - expected: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - "fiz@buzz": { - ClientCertificate: "cert2", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - "buzz": { - Server: "another.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - "fiz@buzz": { - Cluster: "buzz", - AuthInfo: "fiz@buzz", - }, - }, - CurrentContext: "fiz@buzz", - }, - options: kubeconfig.MergeOptions{ - ActivateContext: true, - ConflictHandler: errorAlways, - OutputWriter: os.Stdout, - }, - }, - { // MergeRename - name: "MergeRename", - initial: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "foo@bar", - }, - new: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert2", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "another.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - }, - expected: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - "foo@bar-1": { - ClientCertificate: "cert2", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - "bar-1": { - Server: "another.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - "foo@bar-1": { - Cluster: "bar-1", - AuthInfo: "foo@bar-1", - }, - }, - CurrentContext: "foo@bar", - }, - options: kubeconfig.MergeOptions{ - ConflictHandler: renameAlways, - OutputWriter: os.Stdout, - }, - }, - { // MergeOverwrite - name: "MergeOverwrite", - initial: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "foo@bar", - }, - new: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert2", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "another.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - }, - expected: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert2", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "another.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "foo@bar", - }, - options: kubeconfig.MergeOptions{ - ConflictHandler: overwriteAlways, - OutputWriter: os.Stdout, - }, - }, - { // MergeEqual - name: "MergeEqual", - initial: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "foo@bar", - }, - new: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "foo@bar", - }, - expected: clientcmdapi.Config{ - AuthInfos: map[string]*clientcmdapi.AuthInfo{ - "foo@bar": { - ClientCertificate: "cert1", - }, - }, - Clusters: map[string]*clientcmdapi.Cluster{ - "bar": { - Server: "example.com", - }, - }, - Contexts: map[string]*clientcmdapi.Context{ - "foo@bar": { - Cluster: "bar", - AuthInfo: "foo@bar", - }, - }, - CurrentContext: "foo@bar", - }, - options: kubeconfig.MergeOptions{ - ConflictHandler: errorAlways, - OutputWriter: os.Stdout, - }, - }, - } { - tt := tt - - t.Run(tt.name, func(t *testing.T) { - merger := kubeconfig.Merger(*tt.initial.DeepCopy()) - - err := merger.Merge(&tt.new, tt.options) - require.NoError(t, err) - - assert.Equal(t, tt.expected.Clusters, merger.Clusters) - assert.Equal(t, tt.expected.AuthInfos, merger.AuthInfos) - assert.Equal(t, tt.expected.Contexts, merger.Contexts) - assert.Equal(t, tt.expected.CurrentContext, merger.CurrentContext) - }) - } -} diff --git a/pkg/machinery/constants/constants.go b/pkg/machinery/constants/constants.go index 2cd6e44fb..897019d02 100644 --- a/pkg/machinery/constants/constants.go +++ b/pkg/machinery/constants/constants.go @@ -761,6 +761,9 @@ const ( // TrustdMaxProcs is the maximum number of GOMAXPROCS for trustd. TrustdMaxProcs = 2 + + // APIAuthzRoleMetadataKey is the gRPC metadata key used to submit a role with os:impersonator. + APIAuthzRoleMetadataKey = "talos-role" ) // See https://linux.die.net/man/3/klogctl diff --git a/pkg/provision/providers/vm/crashdump.go b/pkg/provision/providers/vm/crashdump.go index 926a368e2..2ad84c99b 100644 --- a/pkg/provision/providers/vm/crashdump.go +++ b/pkg/provision/providers/vm/crashdump.go @@ -12,8 +12,9 @@ import ( "path/filepath" "strings" + "github.com/siderolabs/go-tail" + "github.com/talos-systems/talos/pkg/provision" - "github.com/talos-systems/talos/pkg/tail" ) // CrashDump produces debug information to help with debugging failures. diff --git a/pkg/tail/tail.go b/pkg/tail/tail.go deleted file mode 100644 index a251e6079..000000000 --- a/pkg/tail/tail.go +++ /dev/null @@ -1,94 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -// Package tail implements tailing of [io.ReadSeeker]. -package tail - -import ( - "bytes" - "fmt" - "io" -) - -// Window is the read window size for tail scanning. -const Window = 4096 - -// SeekLines seeks the passed io.ReadSeeker so that it's -N lines from the tail. -// -// SeekLines might modify file offset even in case of error. -// -//nolint:gocyclo -func SeekLines(r io.ReadSeeker, lines int) error { - offset, err := r.Seek(0, io.SeekEnd) - if err != nil { - return err - } - - readOffset := offset - Window - if readOffset < 0 { - readOffset = 0 - } - - readSize := offset - readOffset - - skippedLines := -1 // we need to skip (lines + 1) \n characters to find position to read from - - buf := make([]byte, Window) - firstRead := true - - for skippedLines < lines && readSize > 0 { - _, err = r.Seek(readOffset, io.SeekStart) - if err != nil { - return err - } - - var n int - - n, err = r.Read(buf[:readSize]) - if err != nil { - return err - } - - if int64(n) != readSize { - return fmt.Errorf("unexpected short read: %d != %d", n, readSize) - } - - if firstRead && buf[n-1] != '\n' { - // last line might not have '\n' - skippedLines++ - } - - firstRead = false - - for n > 0 && skippedLines < lines { - index := bytes.LastIndexByte(buf[:n], '\n') - if index == -1 { - break - } - - skippedLines++ - - n = index - } - - if skippedLines == lines { - readOffset += int64(n) + 1 - - break - } - - offset = readOffset - readOffset -= Window - - if readOffset < 0 { - readOffset = 0 - } - - readSize = offset - readOffset - } - - _, err = r.Seek(readOffset, io.SeekStart) - - return err -} diff --git a/pkg/tail/tail_test.go b/pkg/tail/tail_test.go deleted file mode 100644 index 2884fc860..000000000 --- a/pkg/tail/tail_test.go +++ /dev/null @@ -1,96 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. - -package tail_test - -import ( - "bufio" - "bytes" - "io" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/talos-systems/talos/pkg/tail" -) - -func TestSkipLines(t *testing.T) { - for _, test := range []struct { - desc string - input []byte - tailLines []int - expectLines []int - }{ - { - desc: "empty", - input: nil, - tailLines: []int{0, 1, 2, 10}, - expectLines: []int{0, 0, 0, 0}, - }, - { - desc: "enormous line", - input: bytes.Repeat([]byte{0xde, 0xad, 0xbe, 0xef}, 8000), - tailLines: []int{0, 1, 10}, - expectLines: []int{0, 1, 1}, - }, - { - desc: "enormous line with \\n", - input: append(bytes.Repeat([]byte{0xde, 0xad, 0xbe, 0xef}, 8000), '\n'), - tailLines: []int{0, 1, 10}, - expectLines: []int{0, 1, 1}, - }, - { - desc: "many small lines", - input: bytes.Repeat([]byte{0xde, 0xad, 0xbe, 0xef, '\n'}, 1024), - tailLines: []int{0, 1, 3, 10, 100, 1000}, - expectLines: []int{0, 1, 3, 10, 100, 1000}, - }, - { - desc: "many small aligned lines", - input: bytes.Repeat([]byte{0xde, 0xad, 0xbe, '\n'}, 1024), - tailLines: []int{0, 1, 3, 10, 100, 1000}, - expectLines: []int{0, 1, 3, 10, 100, 1000}, - }, - { - desc: "empty lines", - input: bytes.Repeat([]byte{'\n'}, 65536), - tailLines: []int{0, 1, 3, 10, 100, 1000, 10000}, - expectLines: []int{0, 1, 3, 10, 100, 1000, 10000}, - }, - { - desc: "window-sized lines", - input: bytes.Repeat(append(bytes.Repeat([]byte{'a'}, tail.Window-1), '\n'), 24), - tailLines: []int{0, 1, 3, 10, 100, 1000}, - expectLines: []int{0, 1, 3, 10, 24, 24}, - }, - { - desc: "long lines", - input: bytes.Repeat(append(bytes.Repeat([]byte{'a'}, 356), '\n'), 24), - tailLines: []int{0, 1, 3, 10, 15, 24, 100, 1000}, - expectLines: []int{0, 1, 3, 10, 15, 24, 24, 24}, - }, - } { - for i, lines := range test.tailLines { - r := bytes.NewReader(test.input) - - err := tail.SeekLines(r, lines) - assert.NoError(t, err, "test %q", test.desc) - - tailOffset, _ := r.Seek(0, io.SeekCurrent) //nolint:errcheck - - expected := test.expectLines[i] - actual := 0 - - scanner := bufio.NewScanner(r) - - for scanner.Scan() { - actual++ - } - - assert.NoError(t, scanner.Err(), "test %q", test.desc) - - assert.Equal(t, expected, actual, "test %q, tailOffset %d", test.desc, tailOffset) - } - } -}