mirror of
https://github.com/siderolabs/talos.git
synced 2026-05-05 04:16:21 +02:00
refactor: use go-circular, go-kubeconfig, and go-tail
Remove Talos versions, use new extracted Go modules. Signed-off-by: Andrey Smirnov <andrey.smirnov@talos-systems.com>
This commit is contained in:
parent
c6e1702eca
commit
d7edd0e2e6
@ -20,6 +20,7 @@ import (
|
||||
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/siderolabs/go-blockdevice/blockdevice/encryption"
|
||||
"github.com/siderolabs/go-kubeconfig"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/talos-systems/go-procfs/procfs"
|
||||
@ -30,7 +31,6 @@ import (
|
||||
"github.com/talos-systems/talos/pkg/cli"
|
||||
"github.com/talos-systems/talos/pkg/cluster/check"
|
||||
"github.com/talos-systems/talos/pkg/images"
|
||||
"github.com/talos-systems/talos/pkg/kubeconfig"
|
||||
clientconfig "github.com/talos-systems/talos/pkg/machinery/client/config"
|
||||
"github.com/talos-systems/talos/pkg/machinery/config"
|
||||
"github.com/talos-systems/talos/pkg/machinery/config/configpatcher"
|
||||
|
||||
@ -14,11 +14,11 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/mattn/go-isatty"
|
||||
"github.com/siderolabs/go-kubeconfig"
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
|
||||
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
|
||||
"github.com/talos-systems/talos/pkg/kubeconfig"
|
||||
"github.com/talos-systems/talos/pkg/machinery/client"
|
||||
)
|
||||
|
||||
|
||||
@ -11,12 +11,12 @@ import (
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/siderolabs/go-circular"
|
||||
"github.com/talos-systems/go-retry/retry"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc/codes"
|
||||
|
||||
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
|
||||
"github.com/talos-systems/talos/pkg/circular"
|
||||
"github.com/talos-systems/talos/pkg/machinery/api/common"
|
||||
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
|
||||
"github.com/talos-systems/talos/pkg/machinery/client"
|
||||
|
||||
@ -16,13 +16,13 @@ import (
|
||||
|
||||
"github.com/mattn/go-isatty"
|
||||
"github.com/siderolabs/gen/maps"
|
||||
"github.com/siderolabs/go-circular"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/backoff"
|
||||
|
||||
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/global"
|
||||
"github.com/talos-systems/talos/cmd/talosctl/pkg/talos/helpers"
|
||||
"github.com/talos-systems/talos/pkg/circular"
|
||||
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
|
||||
"github.com/talos-systems/talos/pkg/machinery/client"
|
||||
"github.com/talos-systems/talos/pkg/reporter"
|
||||
|
||||
5
go.mod
5
go.mod
@ -92,15 +92,18 @@ require (
|
||||
github.com/siderolabs/discovery-client v0.1.2
|
||||
github.com/siderolabs/gen v0.4.0
|
||||
github.com/siderolabs/go-blockdevice v0.4.0
|
||||
github.com/siderolabs/go-circular v0.1.0
|
||||
github.com/siderolabs/go-kubeconfig v0.1.0
|
||||
github.com/siderolabs/go-loadbalancer v0.2.0
|
||||
github.com/siderolabs/go-pcidb v0.1.0
|
||||
github.com/siderolabs/go-pointer v1.0.0
|
||||
github.com/siderolabs/go-smbios v0.3.1
|
||||
github.com/siderolabs/go-tail v0.1.0
|
||||
github.com/siderolabs/grpc-proxy v0.4.0
|
||||
github.com/siderolabs/siderolink v0.2.0
|
||||
github.com/spf13/cobra v1.6.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/stretchr/testify v1.8.0
|
||||
github.com/stretchr/testify v1.8.1
|
||||
github.com/talos-systems/go-cmd v0.1.0
|
||||
github.com/talos-systems/go-debug v0.2.1
|
||||
github.com/talos-systems/go-kmsg v0.1.1
|
||||
|
||||
12
go.sum
12
go.sum
@ -1062,6 +1062,10 @@ github.com/siderolabs/go-api-signature v0.1.0 h1:8tDoOUg5Ns1DMv+7dGy8s8mKeKygZMJ
|
||||
github.com/siderolabs/go-api-signature v0.1.0/go.mod h1:9jSRiJsuKyf6b/hyKcBgCZXvu7xGJ+RiCQQAVraGdN0=
|
||||
github.com/siderolabs/go-blockdevice v0.4.0 h1:b+5b4Lu35U6MFXQDE2g0hAW6pTqkJypPF8WbktbbXlQ=
|
||||
github.com/siderolabs/go-blockdevice v0.4.0/go.mod h1:viu/gYeRpdr41UcR39SjlFr4vID4WyaIuGd8lOKuyA4=
|
||||
github.com/siderolabs/go-circular v0.1.0 h1:zpBJNUbCZSh0odZxA4Dcj0d3ShLLR2WxKW6hTdAtoiE=
|
||||
github.com/siderolabs/go-circular v0.1.0/go.mod h1:14XnLf/I3J0VjzTgmwWNGjp58/bdIi4zXppAEx8plfw=
|
||||
github.com/siderolabs/go-kubeconfig v0.1.0 h1:t/2oMWkLSdWHXglKPMz8ySXnx6ZjHckeGY79NaDcBTo=
|
||||
github.com/siderolabs/go-kubeconfig v0.1.0/go.mod h1:eM3mO02Td6wYDvdi9zTbMrj1Q4WqEFN8XQ6pNjCUWkI=
|
||||
github.com/siderolabs/go-loadbalancer v0.2.0 h1:Uv7nJqIwqtRVgUShsQwHmThfJTugpDhmizIvYwa/Rdk=
|
||||
github.com/siderolabs/go-loadbalancer v0.2.0/go.mod h1:pWeG20ljyAL4PTJehpDnkoEHUr8ZL/6C/zpfgPtbe70=
|
||||
github.com/siderolabs/go-pcidb v0.1.0 h1:6cJPBBmHlIF4GouYR/1g3JXS/niAON+6lIOfKl/t794=
|
||||
@ -1070,6 +1074,8 @@ github.com/siderolabs/go-pointer v1.0.0 h1:6TshPKep2doDQJAAtHUuHWXbca8ZfyRySjSBT
|
||||
github.com/siderolabs/go-pointer v1.0.0/go.mod h1:HTRFUNYa3R+k0FFKNv11zgkaCLzEkWVzoYZ433P3kHc=
|
||||
github.com/siderolabs/go-smbios v0.3.1 h1:lS7axBwd485Gevb20a8QSimBh2OoP88P2SgA2mSHumA=
|
||||
github.com/siderolabs/go-smbios v0.3.1/go.mod h1:Yyyj/yY9qrYTXZnQViyhGuCV4b3IRSZNj3sOXrLvl/c=
|
||||
github.com/siderolabs/go-tail v0.1.0 h1:U+ZClt7BXLGsxDNU/XQ12sz7lQElfFZBYEPdkW78Qro=
|
||||
github.com/siderolabs/go-tail v0.1.0/go.mod h1:vWxumnRUS3eTZczORCJW3QMjxiTETN31vyuFdaW8rPw=
|
||||
github.com/siderolabs/grpc-proxy v0.4.0 h1:zYrhqLYs8JlYoLHYeel7/XwXDZ4OJ5XyP9wX7JlbPew=
|
||||
github.com/siderolabs/grpc-proxy v0.4.0/go.mod h1:QDurYOwQD4H8BKyvCuUxMiuG/etYnb/++xaQB644NdU=
|
||||
github.com/siderolabs/protoenc v0.2.0 h1:QFxWIAo//12+/bm27GNYoK/TpQGTYsRrrZCu9jSghvU=
|
||||
@ -1125,8 +1131,9 @@ github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
|
||||
github.com/stretchr/objx v0.4.0 h1:M2gUjqZET1qApGOWNSnZ49BAIMX4F/1plDv3+l31EJ4=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/testify v0.0.0-20180303142811-b89eecf5ca5d/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
@ -1136,8 +1143,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
|
||||
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
|
||||
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
|
||||
|
||||
@ -15,11 +15,11 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/siderolabs/go-circular"
|
||||
"github.com/siderolabs/go-tail"
|
||||
"github.com/talos-systems/go-debug"
|
||||
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
|
||||
"github.com/talos-systems/talos/pkg/circular"
|
||||
"github.com/talos-systems/talos/pkg/tail"
|
||||
)
|
||||
|
||||
// These constants should some day move to config.
|
||||
|
||||
@ -12,9 +12,10 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/siderolabs/go-tail"
|
||||
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
|
||||
"github.com/talos-systems/talos/pkg/follow"
|
||||
"github.com/talos-systems/talos/pkg/tail"
|
||||
)
|
||||
|
||||
// FileLoggingManager implements simple logging to files.
|
||||
|
||||
@ -14,10 +14,11 @@ import (
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/siderolabs/go-tail"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/chunker"
|
||||
"github.com/talos-systems/talos/pkg/chunker/file"
|
||||
"github.com/talos-systems/talos/pkg/chunker/stream"
|
||||
"github.com/talos-systems/talos/pkg/tail"
|
||||
)
|
||||
|
||||
// Container presents information about a container.
|
||||
|
||||
@ -1,160 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
// Package circular provides a buffer with circular semantics.
|
||||
package circular
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Buffer implements circular buffer with a thread-safe writer,
|
||||
// that supports multiple readers each with its own offset.
|
||||
type Buffer struct {
|
||||
opt Options
|
||||
|
||||
// synchronizing access to data, off
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
|
||||
// data slice, might grow up to MaxCapacity, then used
|
||||
// as circular buffer
|
||||
data []byte
|
||||
|
||||
// write offset, always goes up, actual offset in data slice
|
||||
// is (off % cap(data))
|
||||
off int64
|
||||
}
|
||||
|
||||
// NewBuffer creates new Buffer with specified options.
|
||||
func NewBuffer(opts ...OptionFunc) (*Buffer, error) {
|
||||
buf := &Buffer{
|
||||
opt: defaultOptions(),
|
||||
}
|
||||
|
||||
for _, o := range opts {
|
||||
if err := o(&buf.opt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if buf.opt.InitialCapacity > buf.opt.MaxCapacity {
|
||||
return nil, fmt.Errorf("initial capacity (%d) should be less or equal to max capacity (%d)", buf.opt.InitialCapacity, buf.opt.MaxCapacity)
|
||||
}
|
||||
|
||||
if buf.opt.SafetyGap >= buf.opt.MaxCapacity {
|
||||
return nil, fmt.Errorf("safety gap (%d) should be less than max capacity (%d)", buf.opt.SafetyGap, buf.opt.MaxCapacity)
|
||||
}
|
||||
|
||||
buf.data = make([]byte, buf.opt.InitialCapacity)
|
||||
buf.cond = sync.NewCond(&buf.mu)
|
||||
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// Write implements io.Writer interface.
|
||||
func (buf *Buffer) Write(p []byte) (int, error) {
|
||||
l := len(p)
|
||||
if l == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
buf.mu.Lock()
|
||||
defer buf.mu.Unlock()
|
||||
|
||||
if buf.off < int64(buf.opt.MaxCapacity) {
|
||||
if buf.off+int64(l) > int64(cap(buf.data)) && cap(buf.data) < buf.opt.MaxCapacity {
|
||||
// grow buffer to ensure write fits, but limit with max capacity
|
||||
size := cap(buf.data) * 2
|
||||
for size < int(buf.off)+l {
|
||||
size *= 2
|
||||
}
|
||||
|
||||
if size > buf.opt.MaxCapacity {
|
||||
size = buf.opt.MaxCapacity
|
||||
}
|
||||
|
||||
data := make([]byte, size)
|
||||
copy(data, buf.data)
|
||||
buf.data = data
|
||||
}
|
||||
}
|
||||
|
||||
var n int
|
||||
for n < l {
|
||||
i := int(buf.off % int64(buf.opt.MaxCapacity))
|
||||
|
||||
nn := buf.opt.MaxCapacity - i
|
||||
if nn > len(p) {
|
||||
nn = len(p)
|
||||
}
|
||||
|
||||
copy(buf.data[i:], p[:nn])
|
||||
|
||||
buf.off += int64(nn)
|
||||
n += nn
|
||||
p = p[nn:]
|
||||
}
|
||||
|
||||
buf.cond.Broadcast()
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Capacity returns number of bytes allocated for the buffer.
|
||||
func (buf *Buffer) Capacity() int {
|
||||
buf.mu.Lock()
|
||||
defer buf.mu.Unlock()
|
||||
|
||||
return cap(buf.data)
|
||||
}
|
||||
|
||||
// Offset returns current write offset (number of bytes written).
|
||||
func (buf *Buffer) Offset() int64 {
|
||||
buf.mu.Lock()
|
||||
defer buf.mu.Unlock()
|
||||
|
||||
return buf.off
|
||||
}
|
||||
|
||||
// GetStreamingReader returns StreamingReader object which implements io.ReadCloser, io.Seeker.
|
||||
//
|
||||
// StreamingReader starts at the most distant position in the past available.
|
||||
func (buf *Buffer) GetStreamingReader() *StreamingReader {
|
||||
buf.mu.Lock()
|
||||
defer buf.mu.Unlock()
|
||||
|
||||
off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap)
|
||||
if off < 0 {
|
||||
off = 0
|
||||
}
|
||||
|
||||
return &StreamingReader{
|
||||
buf: buf,
|
||||
initialOff: off,
|
||||
off: off,
|
||||
}
|
||||
}
|
||||
|
||||
// GetReader returns Reader object which implements io.ReadCloser, io.Seeker.
|
||||
//
|
||||
// Reader starts at the most distant position in the past available and goes
|
||||
// to the current write position.
|
||||
func (buf *Buffer) GetReader() *Reader {
|
||||
buf.mu.Lock()
|
||||
defer buf.mu.Unlock()
|
||||
|
||||
off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap)
|
||||
if off < 0 {
|
||||
off = 0
|
||||
}
|
||||
|
||||
return &Reader{
|
||||
buf: buf,
|
||||
startOff: off,
|
||||
endOff: buf.off,
|
||||
off: off,
|
||||
}
|
||||
}
|
||||
@ -1,423 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package circular_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/circular"
|
||||
)
|
||||
|
||||
type CircularSuite struct {
|
||||
suite.Suite
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestWrites() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(100000))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
n, err := buf.Write(nil)
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(0, n)
|
||||
|
||||
n, err = buf.Write(make([]byte, 100))
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(100, n)
|
||||
|
||||
n, err = buf.Write(make([]byte, 1000))
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(1000, n)
|
||||
|
||||
suite.Require().Equal(2048, buf.Capacity())
|
||||
suite.Require().EqualValues(1100, buf.Offset())
|
||||
|
||||
n, err = buf.Write(make([]byte, 5000))
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(5000, n)
|
||||
|
||||
suite.Require().Equal(8192, buf.Capacity())
|
||||
suite.Require().EqualValues(6100, buf.Offset())
|
||||
|
||||
for i := 0; i < 20; i++ {
|
||||
l := 1 << i
|
||||
|
||||
n, err = buf.Write(make([]byte, l))
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(l, n)
|
||||
}
|
||||
|
||||
suite.Require().Equal(100000, buf.Capacity())
|
||||
suite.Require().EqualValues(6100+(1<<20)-1, buf.Offset())
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestStreamingReadWriter() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
r := buf.GetStreamingReader()
|
||||
|
||||
size := 1048576
|
||||
|
||||
data := make([]byte, size)
|
||||
for i := range data {
|
||||
data[i] = byte(rand.Int31())
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
p := data
|
||||
|
||||
r := rate.NewLimiter(300_000, 1000)
|
||||
|
||||
for i := 0; i < len(data); {
|
||||
l := 100 + rand.Intn(100)
|
||||
|
||||
if i+l > len(data) {
|
||||
l = len(data) - i
|
||||
}
|
||||
|
||||
r.WaitN(context.Background(), l) //nolint:errcheck
|
||||
|
||||
n, e := buf.Write(p[:l])
|
||||
suite.Require().NoError(e)
|
||||
suite.Require().Equal(l, n)
|
||||
|
||||
i += l
|
||||
p = p[l:]
|
||||
}
|
||||
}()
|
||||
|
||||
actual := make([]byte, size)
|
||||
|
||||
n, err := io.ReadFull(r, actual)
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(size, n)
|
||||
|
||||
suite.Require().Equal(data, actual)
|
||||
|
||||
s := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
_, err = r.Read(make([]byte, 1))
|
||||
|
||||
suite.Assert().Equal(err, circular.ErrClosed)
|
||||
|
||||
close(s)
|
||||
}()
|
||||
|
||||
time.Sleep(50 * time.Millisecond) // wait for the goroutine to start
|
||||
|
||||
suite.Require().NoError(r.Close())
|
||||
|
||||
// close should abort reader
|
||||
<-s
|
||||
|
||||
_, err = r.Read(nil)
|
||||
suite.Require().Equal(circular.ErrClosed, err)
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestStreamingMultipleReaders() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
n := 5
|
||||
|
||||
readers := make([]*circular.StreamingReader, n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
readers[i] = buf.GetStreamingReader()
|
||||
}
|
||||
|
||||
size := 1048576
|
||||
|
||||
data := make([]byte, size)
|
||||
for i := range data {
|
||||
data[i] = byte(rand.Int31())
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
wg.Add(1)
|
||||
|
||||
i := i
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
actual := make([]byte, size)
|
||||
|
||||
nn, err := io.ReadFull(readers[i], actual)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().Equal(size, nn)
|
||||
|
||||
suite.Assert().Equal(data, actual)
|
||||
}()
|
||||
}
|
||||
|
||||
p := data
|
||||
|
||||
r := rate.NewLimiter(300_000, 1000)
|
||||
|
||||
for i := 0; i < len(data); {
|
||||
l := 256
|
||||
|
||||
if i+l > len(data) {
|
||||
l = len(data) - i
|
||||
}
|
||||
|
||||
r.WaitN(context.Background(), l) //nolint:errcheck
|
||||
|
||||
n, e := buf.Write(p[:l])
|
||||
suite.Require().NoError(e)
|
||||
suite.Require().Equal(l, n)
|
||||
|
||||
i += l
|
||||
p = p[l:]
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestStreamingLateAndIdleReaders() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
idleR := buf.GetStreamingReader()
|
||||
|
||||
size := 100000
|
||||
|
||||
data := make([]byte, size)
|
||||
for i := range data {
|
||||
data[i] = byte(rand.Int31())
|
||||
}
|
||||
|
||||
n, err := buf.Write(data)
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(size, n)
|
||||
|
||||
lateR := buf.GetStreamingReader()
|
||||
|
||||
go func() {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
suite.Require().NoError(lateR.Close())
|
||||
}()
|
||||
|
||||
actual, err := io.ReadAll(lateR)
|
||||
suite.Require().Equal(circular.ErrClosed, err)
|
||||
suite.Require().Equal(65536-256, len(actual))
|
||||
|
||||
suite.Require().Equal(data[size-65536+256:], actual)
|
||||
|
||||
go func() {
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
suite.Require().NoError(idleR.Close())
|
||||
}()
|
||||
|
||||
actual, err = io.ReadAll(idleR)
|
||||
suite.Require().Equal(circular.ErrClosed, err)
|
||||
suite.Require().Equal(65536, len(actual))
|
||||
|
||||
suite.Require().Equal(data[size-65536:], actual)
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestStreamingSeek() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xff}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
r := buf.GetStreamingReader()
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
off, err := r.Seek(0, io.SeekCurrent)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(0, off)
|
||||
|
||||
data := make([]byte, 256)
|
||||
|
||||
n, err := r.Read(data)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().Equal(256, n)
|
||||
suite.Assert().Equal(bytes.Repeat([]byte{0xff}, 256), data)
|
||||
|
||||
off, err = r.Seek(0, io.SeekCurrent)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(256, off)
|
||||
|
||||
off, err = r.Seek(-256, io.SeekEnd)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(768, off)
|
||||
|
||||
n, err = r.Read(data)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().Equal(256, n)
|
||||
suite.Assert().Equal(bytes.Repeat([]byte{0xfe}, 256), data)
|
||||
|
||||
off, err = r.Seek(2048, io.SeekStart)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(1024, off)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
off, err = r.Seek(0, io.SeekStart)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(1024, off)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 1024))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
off, err = r.Seek(0, io.SeekCurrent)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(2048, off)
|
||||
|
||||
_, err = r.Seek(-100, io.SeekStart)
|
||||
suite.Require().Equal(circular.ErrSeekBeforeStart, err)
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestRegularReaderEmpty() {
|
||||
buf, err := circular.NewBuffer()
|
||||
suite.Require().NoError(err)
|
||||
|
||||
n, err := buf.GetReader().Read(nil)
|
||||
suite.Require().Equal(0, n)
|
||||
suite.Require().Equal(io.EOF, err)
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestRegularReader() {
|
||||
buf, err := circular.NewBuffer()
|
||||
suite.Require().NoError(err)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xff}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
r := buf.GetReader()
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
actual, err := io.ReadAll(r)
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(bytes.Repeat([]byte{0xff}, 512), actual)
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestRegularReaderOutOfSync() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xff}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
r := buf.GetReader()
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
_, err = r.Read(nil)
|
||||
suite.Require().Equal(err, circular.ErrOutOfSync)
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestRegularReaderFull() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(4096), circular.WithSafetyGap(256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xff}, 6146))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
r := buf.GetReader()
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 100))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
actual, err := io.ReadAll(r)
|
||||
suite.Require().NoError(err)
|
||||
suite.Require().Equal(bytes.Repeat([]byte{0xff}, 4096-256), actual)
|
||||
|
||||
suite.Require().NoError(r.Close())
|
||||
|
||||
_, err = r.Read(nil)
|
||||
suite.Require().Equal(err, circular.ErrClosed)
|
||||
}
|
||||
|
||||
func (suite *CircularSuite) TestRegularSeek() {
|
||||
buf, err := circular.NewBuffer(circular.WithInitialCapacity(2048), circular.WithMaxCapacity(65536), circular.WithSafetyGap(256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xff}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
r := buf.GetReader()
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfc}, 512))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
off, err := r.Seek(0, io.SeekCurrent)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(0, off)
|
||||
|
||||
data := make([]byte, 256)
|
||||
|
||||
n, err := r.Read(data)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().Equal(256, n)
|
||||
suite.Assert().Equal(bytes.Repeat([]byte{0xff}, 256), data)
|
||||
|
||||
off, err = r.Seek(0, io.SeekCurrent)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(256, off)
|
||||
|
||||
off, err = r.Seek(-256, io.SeekEnd)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(768, off)
|
||||
|
||||
n, err = r.Read(data)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().Equal(256, n)
|
||||
suite.Assert().Equal(bytes.Repeat([]byte{0xfe}, 256), data)
|
||||
|
||||
off, err = r.Seek(2048, io.SeekStart)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(1024, off)
|
||||
|
||||
_, err = buf.Write(bytes.Repeat([]byte{0xfe}, 65536-256))
|
||||
suite.Require().NoError(err)
|
||||
|
||||
off, err = r.Seek(0, io.SeekStart)
|
||||
suite.Require().NoError(err)
|
||||
suite.Assert().EqualValues(0, off)
|
||||
|
||||
_, err = r.Seek(-100, io.SeekStart)
|
||||
suite.Require().Equal(circular.ErrSeekBeforeStart, err)
|
||||
|
||||
_, err = r.Read(nil)
|
||||
suite.Require().Equal(circular.ErrOutOfSync, err)
|
||||
}
|
||||
|
||||
func TestCircularSuite(t *testing.T) {
|
||||
suite.Run(t, new(CircularSuite))
|
||||
}
|
||||
@ -1,16 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package circular
|
||||
|
||||
import "errors"
|
||||
|
||||
// ErrClosed is raised on read from closed Reader.
|
||||
var ErrClosed = errors.New("reader is closed")
|
||||
|
||||
// ErrSeekBeforeStart is raised when seek goes beyond start of the file.
|
||||
var ErrSeekBeforeStart = errors.New("seek before start")
|
||||
|
||||
// ErrOutOfSync is raised when reader got too much out of sync with the writer.
|
||||
var ErrOutOfSync = errors.New("buffer overrun, read position overwritten")
|
||||
@ -1,69 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package circular
|
||||
|
||||
import "fmt"
|
||||
|
||||
// Options defines settings for Buffer.
|
||||
type Options struct {
|
||||
InitialCapacity int
|
||||
MaxCapacity int
|
||||
SafetyGap int
|
||||
}
|
||||
|
||||
// defaultOptions returns default initial values.
|
||||
func defaultOptions() Options {
|
||||
return Options{
|
||||
InitialCapacity: 16384,
|
||||
MaxCapacity: 1048576,
|
||||
SafetyGap: 1024,
|
||||
}
|
||||
}
|
||||
|
||||
// OptionFunc allows setting Buffer options.
|
||||
type OptionFunc func(*Options) error
|
||||
|
||||
// WithInitialCapacity sets initial buffer capacity.
|
||||
func WithInitialCapacity(capacity int) OptionFunc {
|
||||
return func(opt *Options) error {
|
||||
if capacity <= 0 {
|
||||
return fmt.Errorf("initial capacity should be positive: %d", capacity)
|
||||
}
|
||||
|
||||
opt.InitialCapacity = capacity
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxCapacity sets maximum buffer capacity.
|
||||
func WithMaxCapacity(capacity int) OptionFunc {
|
||||
return func(opt *Options) error {
|
||||
if capacity <= 0 {
|
||||
return fmt.Errorf("max capacity should be positive: %d", capacity)
|
||||
}
|
||||
|
||||
opt.MaxCapacity = capacity
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithSafetyGap sets safety gap between readers and writers to avoid buffer overrun for the reader.
|
||||
//
|
||||
// Reader initial position is set to be as far as possible in the buffer history, but next concurrent write
|
||||
// might overwrite read position, and safety gap helps to prevent it. With safety gap, maximum available
|
||||
// bytes to read are: MaxCapacity-SafetyGap.
|
||||
func WithSafetyGap(gap int) OptionFunc {
|
||||
return func(opt *Options) error {
|
||||
if gap <= 0 {
|
||||
return fmt.Errorf("safety gap should be positive: %q", gap)
|
||||
}
|
||||
|
||||
opt.SafetyGap = gap
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -1,103 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package circular
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Reader implements seekable reader with local position in the Buffer which
|
||||
// reads from the fixed part of the buffer.
|
||||
//
|
||||
// Reader is not safe to be used with concurrent Read/Seek operations.
|
||||
type Reader struct {
|
||||
buf *Buffer
|
||||
|
||||
startOff, endOff int64
|
||||
off int64
|
||||
|
||||
closed uint32
|
||||
}
|
||||
|
||||
// Read implements io.Reader.
|
||||
func (r *Reader) Read(p []byte) (n int, err error) {
|
||||
if atomic.LoadUint32(&r.closed) > 0 {
|
||||
err = ErrClosed
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
r.buf.mu.Lock()
|
||||
defer r.buf.mu.Unlock()
|
||||
|
||||
if r.off < r.buf.off-int64(r.buf.opt.MaxCapacity) {
|
||||
// reader is falling too much behind
|
||||
err = ErrOutOfSync
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if r.off == r.endOff {
|
||||
err = io.EOF
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if len(p) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
n = int(r.endOff - r.off)
|
||||
if n > len(p) {
|
||||
n = len(p)
|
||||
}
|
||||
|
||||
i := int(r.off % int64(r.buf.opt.MaxCapacity))
|
||||
|
||||
if l := r.buf.opt.MaxCapacity - i; l < n {
|
||||
copy(p, r.buf.data[i:])
|
||||
copy(p[l:], r.buf.data[:n-l])
|
||||
} else {
|
||||
copy(p, r.buf.data[i:i+n])
|
||||
}
|
||||
|
||||
r.off += int64(n)
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Close implements io.Closer.
|
||||
func (r *Reader) Close() error {
|
||||
atomic.StoreUint32(&r.closed, 1)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Seek implements io.Seeker.
|
||||
func (r *Reader) Seek(offset int64, whence int) (int64, error) {
|
||||
newOff := r.off
|
||||
|
||||
switch whence {
|
||||
case io.SeekCurrent:
|
||||
newOff += offset
|
||||
case io.SeekEnd:
|
||||
newOff = r.endOff + offset
|
||||
case io.SeekStart:
|
||||
newOff = r.startOff + offset
|
||||
}
|
||||
|
||||
if newOff < r.startOff {
|
||||
return r.off - r.startOff, ErrSeekBeforeStart
|
||||
}
|
||||
|
||||
if newOff > r.endOff {
|
||||
newOff = r.endOff
|
||||
}
|
||||
|
||||
r.off = newOff
|
||||
|
||||
return r.off - r.startOff, nil
|
||||
}
|
||||
@ -1,117 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package circular
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// StreamingReader implements seekable reader with local position in the Buffer.
|
||||
//
|
||||
// StreamingReader is not safe to be used with concurrent Read/Seek operations.
|
||||
//
|
||||
// StreamingReader blocks for new data once it exhausts contents of the buffer.
|
||||
type StreamingReader struct {
|
||||
buf *Buffer
|
||||
|
||||
initialOff int64
|
||||
off int64
|
||||
|
||||
closed uint32
|
||||
}
|
||||
|
||||
// Read implements io.Reader.
|
||||
func (r *StreamingReader) Read(p []byte) (n int, err error) {
|
||||
if atomic.LoadUint32(&r.closed) > 0 {
|
||||
err = ErrClosed
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if len(p) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
r.buf.mu.Lock()
|
||||
defer r.buf.mu.Unlock()
|
||||
|
||||
if r.off < r.buf.off-int64(r.buf.opt.MaxCapacity) {
|
||||
// reader is falling too much behind, so need to rewind to the first available position
|
||||
r.off = r.buf.off - int64(r.buf.opt.MaxCapacity)
|
||||
}
|
||||
|
||||
for r.off == r.buf.off {
|
||||
r.buf.cond.Wait()
|
||||
|
||||
if atomic.LoadUint32(&r.closed) > 0 {
|
||||
err = ErrClosed
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
n = int(r.buf.off - r.off)
|
||||
if n > len(p) {
|
||||
n = len(p)
|
||||
}
|
||||
|
||||
i := int(r.off % int64(r.buf.opt.MaxCapacity))
|
||||
|
||||
if l := r.buf.opt.MaxCapacity - i; l < n {
|
||||
copy(p, r.buf.data[i:])
|
||||
copy(p[l:], r.buf.data[:n-l])
|
||||
} else {
|
||||
copy(p, r.buf.data[i:i+n])
|
||||
}
|
||||
|
||||
r.off += int64(n)
|
||||
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Close implements io.Closer.
|
||||
func (r *StreamingReader) Close() error {
|
||||
if atomic.CompareAndSwapUint32(&r.closed, 0, 1) {
|
||||
// wake up readers
|
||||
r.buf.cond.Broadcast()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Seek implements io.Seeker.
|
||||
func (r *StreamingReader) Seek(offset int64, whence int) (int64, error) {
|
||||
newOff := r.off
|
||||
|
||||
r.buf.mu.Lock()
|
||||
writeOff := r.buf.off
|
||||
r.buf.mu.Unlock()
|
||||
|
||||
switch whence {
|
||||
case io.SeekCurrent:
|
||||
newOff += offset
|
||||
case io.SeekEnd:
|
||||
newOff = writeOff + offset
|
||||
case io.SeekStart:
|
||||
newOff = r.initialOff + offset
|
||||
}
|
||||
|
||||
if newOff < r.initialOff {
|
||||
return r.off - r.initialOff, ErrSeekBeforeStart
|
||||
}
|
||||
|
||||
if newOff > writeOff {
|
||||
newOff = writeOff
|
||||
}
|
||||
|
||||
if newOff < writeOff-int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap) {
|
||||
newOff = writeOff - int64(r.buf.opt.MaxCapacity-r.buf.opt.SafetyGap)
|
||||
}
|
||||
|
||||
r.off = newOff
|
||||
|
||||
return r.off - r.initialOff, nil
|
||||
}
|
||||
@ -9,12 +9,13 @@ import (
|
||||
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/machinery/constants"
|
||||
"github.com/talos-systems/talos/pkg/machinery/role"
|
||||
)
|
||||
|
||||
// mdKey is used to store roles in gRPC metadata.
|
||||
// Should be used only in this file.
|
||||
const mdKey = "talos-role"
|
||||
const mdKey = constants.APIAuthzRoleMetadataKey
|
||||
|
||||
// SetMetadata sets given roles in gRPC metadata.
|
||||
func SetMetadata(md metadata.MD, roles role.Set) {
|
||||
|
||||
@ -2,35 +2,5 @@
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
// Package kubeconfig provides Kubernetes config file handling.
|
||||
// Package kubeconfig provides Kubernetes config file generation.
|
||||
package kubeconfig
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// DefaultPath returns path to ~/.kube/config.
|
||||
func DefaultPath() (string, error) {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return filepath.Join(home, ".kube/config"), nil
|
||||
}
|
||||
|
||||
// SinglePath parses KUBECONFIG and the default kubeconfig file location
|
||||
// and ensures there is only one to return.
|
||||
func SinglePath() (string, error) {
|
||||
envVarFilePaths := filepath.SplitList(os.Getenv("KUBECONFIG"))
|
||||
switch len(envVarFilePaths) {
|
||||
case 0:
|
||||
return DefaultPath()
|
||||
case 1:
|
||||
return envVarFilePaths[0], nil
|
||||
default:
|
||||
return "", fmt.Errorf("multiple kubeconfig files defined")
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,60 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package kubeconfig_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/kubeconfig"
|
||||
)
|
||||
|
||||
func TestSinglePath(t *testing.T) {
|
||||
expectedDefaultPath, err := kubeconfig.DefaultPath()
|
||||
assert.NoError(t, err)
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
envVar string
|
||||
shouldErr bool
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "NoKUBECONFIGSet",
|
||||
shouldErr: false,
|
||||
expected: expectedDefaultPath,
|
||||
},
|
||||
{
|
||||
name: "UseKUBECONFIGSet",
|
||||
envVar: "/my/custom/path/to/kubeconfig",
|
||||
shouldErr: false,
|
||||
expected: "/my/custom/path/to/kubeconfig",
|
||||
},
|
||||
{
|
||||
name: "MultiKUBECONFIGSet",
|
||||
envVar: "/my/custom/path/to/kubeconfig:/another/path/to/kubeconfig:/foo/bar/kubeconfig",
|
||||
shouldErr: true,
|
||||
},
|
||||
} {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.envVar != "" {
|
||||
t.Setenv("KUBECONFIG", tt.envVar)
|
||||
}
|
||||
result, err := kubeconfig.SinglePath()
|
||||
|
||||
if tt.shouldErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
assert.Equal(t, tt.expected, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1,225 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package kubeconfig
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
)
|
||||
|
||||
// Merger handles merging of Kubernetes client config files.
|
||||
type Merger clientcmdapi.Config
|
||||
|
||||
// Load the kubeconfig from file.
|
||||
func Load(path string) (*Merger, error) {
|
||||
config, err := clientcmd.LoadFromFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return (*Merger)(config), err
|
||||
}
|
||||
|
||||
// MergeOptions controls Merge process.
|
||||
type MergeOptions struct {
|
||||
ForceContextName string
|
||||
ActivateContext bool
|
||||
ConflictHandler func(ConfigComponent, string) (ConflictDecision, error)
|
||||
OutputWriter io.Writer
|
||||
}
|
||||
|
||||
// ConfigComponent identifies part of kubeconfig.
|
||||
type ConfigComponent string
|
||||
|
||||
// Kubeconfig components.
|
||||
const (
|
||||
Cluster ConfigComponent = "cluster"
|
||||
AuthInfo ConfigComponent = "auth"
|
||||
Context ConfigComponent = "context"
|
||||
)
|
||||
|
||||
// ConflictDecision is returned from ConflictHandler.
|
||||
type ConflictDecision string
|
||||
|
||||
// Conflict decisions.
|
||||
const (
|
||||
OverwriteDecision ConflictDecision = "overwrite"
|
||||
RenameDecision ConflictDecision = "rename"
|
||||
)
|
||||
|
||||
// Merge the provided kubernetes config in.
|
||||
//
|
||||
//nolint:gocyclo,cyclop
|
||||
func (merger *Merger) Merge(config *clientcmdapi.Config, options MergeOptions) error {
|
||||
mappedClusters := map[string]string{}
|
||||
mappedAuthInfos := map[string]string{}
|
||||
mappedContexts := map[string]string{}
|
||||
|
||||
for name, newCluster := range config.Clusters {
|
||||
mergedName := name
|
||||
|
||||
oldCluster, exists := merger.Clusters[mergedName]
|
||||
|
||||
newCluster.LocationOfOrigin = ""
|
||||
|
||||
if oldCluster != nil {
|
||||
oldCluster.LocationOfOrigin = ""
|
||||
}
|
||||
|
||||
if exists && !reflect.DeepEqual(oldCluster, newCluster) {
|
||||
decision, err := options.ConflictHandler(Cluster, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if decision == RenameDecision {
|
||||
mergedName = merger.rename(Cluster, mergedName)
|
||||
}
|
||||
}
|
||||
|
||||
mappedClusters[name] = mergedName
|
||||
}
|
||||
|
||||
for name, newAuthInfo := range config.AuthInfos {
|
||||
mergedName := name
|
||||
|
||||
// apply previous mappings done to cluster names
|
||||
for oldName, newName := range mappedClusters {
|
||||
mergedName = strings.ReplaceAll(mergedName, oldName, newName)
|
||||
}
|
||||
|
||||
oldAuthInfo, exists := merger.AuthInfos[mergedName]
|
||||
|
||||
newAuthInfo.LocationOfOrigin = ""
|
||||
|
||||
if oldAuthInfo != nil {
|
||||
oldAuthInfo.LocationOfOrigin = ""
|
||||
}
|
||||
|
||||
if exists && !reflect.DeepEqual(oldAuthInfo, newAuthInfo) {
|
||||
decision, err := options.ConflictHandler(AuthInfo, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if decision == RenameDecision {
|
||||
mergedName = merger.rename(AuthInfo, mergedName)
|
||||
}
|
||||
}
|
||||
|
||||
mappedAuthInfos[name] = mergedName
|
||||
}
|
||||
|
||||
for name, newContext := range config.Contexts {
|
||||
mergedName := name
|
||||
|
||||
// apply mappings done to authInfo, as authInfo has same format as context in Talos
|
||||
for oldName, newName := range mappedAuthInfos {
|
||||
mergedName = strings.ReplaceAll(mergedName, oldName, newName)
|
||||
}
|
||||
|
||||
if options.ForceContextName != "" {
|
||||
mergedName = options.ForceContextName
|
||||
}
|
||||
|
||||
oldContext, exists := merger.Clusters[mergedName]
|
||||
|
||||
newContext.LocationOfOrigin = ""
|
||||
|
||||
if oldContext != nil {
|
||||
oldContext.LocationOfOrigin = ""
|
||||
}
|
||||
|
||||
if exists && !reflect.DeepEqual(oldContext, newContext) {
|
||||
decision, err := options.ConflictHandler(Cluster, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if decision == RenameDecision {
|
||||
mergedName = merger.rename(Cluster, mergedName)
|
||||
}
|
||||
}
|
||||
|
||||
mappedContexts[name] = mergedName
|
||||
}
|
||||
|
||||
for name, cluster := range config.Clusters {
|
||||
newName := mappedClusters[name]
|
||||
|
||||
if newName != name {
|
||||
fmt.Fprintf(options.OutputWriter, "renamed cluster %q -> %q\n", name, newName)
|
||||
}
|
||||
|
||||
merger.Clusters[newName] = cluster
|
||||
}
|
||||
|
||||
for name, authInfo := range config.AuthInfos {
|
||||
newName := mappedAuthInfos[name]
|
||||
|
||||
if newName != name {
|
||||
fmt.Fprintf(options.OutputWriter, "renamed auth info %q -> %q\n", name, newName)
|
||||
}
|
||||
|
||||
merger.AuthInfos[newName] = authInfo
|
||||
}
|
||||
|
||||
for name, context := range config.Contexts {
|
||||
contextCopy := *context
|
||||
|
||||
newName := mappedContexts[name]
|
||||
|
||||
if newName != name {
|
||||
fmt.Fprintf(options.OutputWriter, "renamed context %q -> %q\n", name, newName)
|
||||
}
|
||||
|
||||
contextCopy.AuthInfo = mappedAuthInfos[contextCopy.AuthInfo]
|
||||
contextCopy.Cluster = mappedClusters[contextCopy.Cluster]
|
||||
|
||||
merger.Contexts[newName] = &contextCopy
|
||||
|
||||
if options.ActivateContext {
|
||||
merger.CurrentContext = newName
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rename the config component until it gets unique.
|
||||
func (merger *Merger) rename(component ConfigComponent, name string) (newName string) {
|
||||
i := 0
|
||||
newName = name
|
||||
|
||||
for {
|
||||
var exists bool
|
||||
|
||||
switch component {
|
||||
case Cluster:
|
||||
_, exists = merger.Clusters[newName]
|
||||
case AuthInfo:
|
||||
_, exists = merger.AuthInfos[newName]
|
||||
case Context:
|
||||
_, exists = merger.Contexts[newName]
|
||||
}
|
||||
|
||||
if !exists {
|
||||
return newName
|
||||
}
|
||||
|
||||
i++
|
||||
newName = fmt.Sprintf("%s-%d", name, i)
|
||||
}
|
||||
}
|
||||
|
||||
// Write the kubeconfig back to the file.
|
||||
func (merger *Merger) Write(path string) error {
|
||||
return clientcmd.WriteToFile(clientcmdapi.Config(*merger), path)
|
||||
}
|
||||
@ -1,375 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package kubeconfig_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/kubeconfig"
|
||||
)
|
||||
|
||||
func TestMerger(t *testing.T) {
|
||||
errorAlways := func(kubeconfig.ConfigComponent, string) (kubeconfig.ConflictDecision, error) {
|
||||
return "", fmt.Errorf("shouldn't be here")
|
||||
}
|
||||
renameAlways := func(kubeconfig.ConfigComponent, string) (kubeconfig.ConflictDecision, error) {
|
||||
return kubeconfig.RenameDecision, nil
|
||||
}
|
||||
overwriteAlways := func(kubeconfig.ConfigComponent, string) (kubeconfig.ConflictDecision, error) {
|
||||
return kubeconfig.OverwriteDecision, nil
|
||||
}
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
initial clientcmdapi.Config
|
||||
new clientcmdapi.Config
|
||||
expected clientcmdapi.Config
|
||||
options kubeconfig.MergeOptions
|
||||
}{
|
||||
{ // MergeIntoEmpty
|
||||
name: "MergeIntoEmpty",
|
||||
initial: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{},
|
||||
Contexts: map[string]*clientcmdapi.Context{},
|
||||
},
|
||||
new: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "nothing",
|
||||
},
|
||||
expected: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
options: kubeconfig.MergeOptions{
|
||||
ConflictHandler: errorAlways,
|
||||
OutputWriter: os.Stdout,
|
||||
},
|
||||
},
|
||||
{ // MergeClean
|
||||
name: "MergeClean",
|
||||
initial: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
new: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"fiz@buzz": {
|
||||
ClientCertificate: "cert2",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"buzz": {
|
||||
Server: "another.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"fiz@buzz": {
|
||||
Cluster: "buzz",
|
||||
AuthInfo: "fiz@buzz",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
"fiz@buzz": {
|
||||
ClientCertificate: "cert2",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
"buzz": {
|
||||
Server: "another.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
"fiz@buzz": {
|
||||
Cluster: "buzz",
|
||||
AuthInfo: "fiz@buzz",
|
||||
},
|
||||
},
|
||||
CurrentContext: "fiz@buzz",
|
||||
},
|
||||
options: kubeconfig.MergeOptions{
|
||||
ActivateContext: true,
|
||||
ConflictHandler: errorAlways,
|
||||
OutputWriter: os.Stdout,
|
||||
},
|
||||
},
|
||||
{ // MergeRename
|
||||
name: "MergeRename",
|
||||
initial: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
new: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert2",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "another.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
"foo@bar-1": {
|
||||
ClientCertificate: "cert2",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
"bar-1": {
|
||||
Server: "another.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
"foo@bar-1": {
|
||||
Cluster: "bar-1",
|
||||
AuthInfo: "foo@bar-1",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
options: kubeconfig.MergeOptions{
|
||||
ConflictHandler: renameAlways,
|
||||
OutputWriter: os.Stdout,
|
||||
},
|
||||
},
|
||||
{ // MergeOverwrite
|
||||
name: "MergeOverwrite",
|
||||
initial: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
new: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert2",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "another.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert2",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "another.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
options: kubeconfig.MergeOptions{
|
||||
ConflictHandler: overwriteAlways,
|
||||
OutputWriter: os.Stdout,
|
||||
},
|
||||
},
|
||||
{ // MergeEqual
|
||||
name: "MergeEqual",
|
||||
initial: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
new: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
expected: clientcmdapi.Config{
|
||||
AuthInfos: map[string]*clientcmdapi.AuthInfo{
|
||||
"foo@bar": {
|
||||
ClientCertificate: "cert1",
|
||||
},
|
||||
},
|
||||
Clusters: map[string]*clientcmdapi.Cluster{
|
||||
"bar": {
|
||||
Server: "example.com",
|
||||
},
|
||||
},
|
||||
Contexts: map[string]*clientcmdapi.Context{
|
||||
"foo@bar": {
|
||||
Cluster: "bar",
|
||||
AuthInfo: "foo@bar",
|
||||
},
|
||||
},
|
||||
CurrentContext: "foo@bar",
|
||||
},
|
||||
options: kubeconfig.MergeOptions{
|
||||
ConflictHandler: errorAlways,
|
||||
OutputWriter: os.Stdout,
|
||||
},
|
||||
},
|
||||
} {
|
||||
tt := tt
|
||||
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
merger := kubeconfig.Merger(*tt.initial.DeepCopy())
|
||||
|
||||
err := merger.Merge(&tt.new, tt.options)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, tt.expected.Clusters, merger.Clusters)
|
||||
assert.Equal(t, tt.expected.AuthInfos, merger.AuthInfos)
|
||||
assert.Equal(t, tt.expected.Contexts, merger.Contexts)
|
||||
assert.Equal(t, tt.expected.CurrentContext, merger.CurrentContext)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -761,6 +761,9 @@ const (
|
||||
|
||||
// TrustdMaxProcs is the maximum number of GOMAXPROCS for trustd.
|
||||
TrustdMaxProcs = 2
|
||||
|
||||
// APIAuthzRoleMetadataKey is the gRPC metadata key used to submit a role with os:impersonator.
|
||||
APIAuthzRoleMetadataKey = "talos-role"
|
||||
)
|
||||
|
||||
// See https://linux.die.net/man/3/klogctl
|
||||
|
||||
@ -12,8 +12,9 @@ import (
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/siderolabs/go-tail"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/provision"
|
||||
"github.com/talos-systems/talos/pkg/tail"
|
||||
)
|
||||
|
||||
// CrashDump produces debug information to help with debugging failures.
|
||||
|
||||
@ -1,94 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
// Package tail implements tailing of [io.ReadSeeker].
|
||||
package tail
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
)
|
||||
|
||||
// Window is the read window size for tail scanning.
|
||||
const Window = 4096
|
||||
|
||||
// SeekLines seeks the passed io.ReadSeeker so that it's -N lines from the tail.
|
||||
//
|
||||
// SeekLines might modify file offset even in case of error.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
func SeekLines(r io.ReadSeeker, lines int) error {
|
||||
offset, err := r.Seek(0, io.SeekEnd)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
readOffset := offset - Window
|
||||
if readOffset < 0 {
|
||||
readOffset = 0
|
||||
}
|
||||
|
||||
readSize := offset - readOffset
|
||||
|
||||
skippedLines := -1 // we need to skip (lines + 1) \n characters to find position to read from
|
||||
|
||||
buf := make([]byte, Window)
|
||||
firstRead := true
|
||||
|
||||
for skippedLines < lines && readSize > 0 {
|
||||
_, err = r.Seek(readOffset, io.SeekStart)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var n int
|
||||
|
||||
n, err = r.Read(buf[:readSize])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if int64(n) != readSize {
|
||||
return fmt.Errorf("unexpected short read: %d != %d", n, readSize)
|
||||
}
|
||||
|
||||
if firstRead && buf[n-1] != '\n' {
|
||||
// last line might not have '\n'
|
||||
skippedLines++
|
||||
}
|
||||
|
||||
firstRead = false
|
||||
|
||||
for n > 0 && skippedLines < lines {
|
||||
index := bytes.LastIndexByte(buf[:n], '\n')
|
||||
if index == -1 {
|
||||
break
|
||||
}
|
||||
|
||||
skippedLines++
|
||||
|
||||
n = index
|
||||
}
|
||||
|
||||
if skippedLines == lines {
|
||||
readOffset += int64(n) + 1
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
offset = readOffset
|
||||
readOffset -= Window
|
||||
|
||||
if readOffset < 0 {
|
||||
readOffset = 0
|
||||
}
|
||||
|
||||
readSize = offset - readOffset
|
||||
}
|
||||
|
||||
_, err = r.Seek(readOffset, io.SeekStart)
|
||||
|
||||
return err
|
||||
}
|
||||
@ -1,96 +0,0 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package tail_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/talos-systems/talos/pkg/tail"
|
||||
)
|
||||
|
||||
func TestSkipLines(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
desc string
|
||||
input []byte
|
||||
tailLines []int
|
||||
expectLines []int
|
||||
}{
|
||||
{
|
||||
desc: "empty",
|
||||
input: nil,
|
||||
tailLines: []int{0, 1, 2, 10},
|
||||
expectLines: []int{0, 0, 0, 0},
|
||||
},
|
||||
{
|
||||
desc: "enormous line",
|
||||
input: bytes.Repeat([]byte{0xde, 0xad, 0xbe, 0xef}, 8000),
|
||||
tailLines: []int{0, 1, 10},
|
||||
expectLines: []int{0, 1, 1},
|
||||
},
|
||||
{
|
||||
desc: "enormous line with \\n",
|
||||
input: append(bytes.Repeat([]byte{0xde, 0xad, 0xbe, 0xef}, 8000), '\n'),
|
||||
tailLines: []int{0, 1, 10},
|
||||
expectLines: []int{0, 1, 1},
|
||||
},
|
||||
{
|
||||
desc: "many small lines",
|
||||
input: bytes.Repeat([]byte{0xde, 0xad, 0xbe, 0xef, '\n'}, 1024),
|
||||
tailLines: []int{0, 1, 3, 10, 100, 1000},
|
||||
expectLines: []int{0, 1, 3, 10, 100, 1000},
|
||||
},
|
||||
{
|
||||
desc: "many small aligned lines",
|
||||
input: bytes.Repeat([]byte{0xde, 0xad, 0xbe, '\n'}, 1024),
|
||||
tailLines: []int{0, 1, 3, 10, 100, 1000},
|
||||
expectLines: []int{0, 1, 3, 10, 100, 1000},
|
||||
},
|
||||
{
|
||||
desc: "empty lines",
|
||||
input: bytes.Repeat([]byte{'\n'}, 65536),
|
||||
tailLines: []int{0, 1, 3, 10, 100, 1000, 10000},
|
||||
expectLines: []int{0, 1, 3, 10, 100, 1000, 10000},
|
||||
},
|
||||
{
|
||||
desc: "window-sized lines",
|
||||
input: bytes.Repeat(append(bytes.Repeat([]byte{'a'}, tail.Window-1), '\n'), 24),
|
||||
tailLines: []int{0, 1, 3, 10, 100, 1000},
|
||||
expectLines: []int{0, 1, 3, 10, 24, 24},
|
||||
},
|
||||
{
|
||||
desc: "long lines",
|
||||
input: bytes.Repeat(append(bytes.Repeat([]byte{'a'}, 356), '\n'), 24),
|
||||
tailLines: []int{0, 1, 3, 10, 15, 24, 100, 1000},
|
||||
expectLines: []int{0, 1, 3, 10, 15, 24, 24, 24},
|
||||
},
|
||||
} {
|
||||
for i, lines := range test.tailLines {
|
||||
r := bytes.NewReader(test.input)
|
||||
|
||||
err := tail.SeekLines(r, lines)
|
||||
assert.NoError(t, err, "test %q", test.desc)
|
||||
|
||||
tailOffset, _ := r.Seek(0, io.SeekCurrent) //nolint:errcheck
|
||||
|
||||
expected := test.expectLines[i]
|
||||
actual := 0
|
||||
|
||||
scanner := bufio.NewScanner(r)
|
||||
|
||||
for scanner.Scan() {
|
||||
actual++
|
||||
}
|
||||
|
||||
assert.NoError(t, scanner.Err(), "test %q", test.desc)
|
||||
|
||||
assert.Equal(t, expected, actual, "test %q, tailOffset %d", test.desc, tailOffset)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user