mirror of
https://github.com/siderolabs/talos.git
synced 2026-05-05 04:16:21 +02:00
It all started debugging the issue on Talos being stuck on reboot when `talosctl logs -f kubelet` is being used. Fixes: * abort goroutine runner even if the goroutine doesn't terminate - we have no way to force termination, so at least don't hang forever * align timeouts for apid/trustd for graceful termination - so that at least the service is not SIGKILLed while it does its own graceful shutdown * in stream chunker, act on canceled context immediately instead of relying on `Read` to return: with `logs -f` the reader will block forever waiting for new logs Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
132 lines
3.0 KiB
Go
132 lines
3.0 KiB
Go
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package stream_test
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/suite"
|
|
|
|
"github.com/siderolabs/talos/pkg/chunker/stream"
|
|
)
|
|
|
|
type StreamChunkerSuite struct {
|
|
suite.Suite
|
|
|
|
reader *io.PipeReader
|
|
writer *io.PipeWriter
|
|
}
|
|
|
|
func (suite *StreamChunkerSuite) SetupTest() {
|
|
suite.reader, suite.writer = io.Pipe()
|
|
}
|
|
|
|
func (suite *StreamChunkerSuite) TearDownTest() {
|
|
suite.Require().NoError(suite.writer.Close())
|
|
suite.Require().NoError(suite.reader.Close())
|
|
}
|
|
|
|
func collectChunks(chunksCh <-chan []byte) <-chan []byte {
|
|
combinedCh := make(chan []byte)
|
|
|
|
go func() {
|
|
res := []byte(nil)
|
|
|
|
for chunk := range chunksCh {
|
|
res = append(res, chunk...)
|
|
}
|
|
|
|
combinedCh <- res
|
|
}()
|
|
|
|
return combinedCh
|
|
}
|
|
|
|
func (suite *StreamChunkerSuite) TestStreaming() {
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
|
defer ctxCancel()
|
|
|
|
chunker := stream.NewChunker(ctx, suite.reader)
|
|
|
|
chunksCh := chunker.Read()
|
|
combinedCh := collectChunks(chunksCh)
|
|
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("abc"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("def"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("ghi"))
|
|
time.Sleep(50 * time.Millisecond)
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("jkl"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("mno"))
|
|
|
|
suite.Require().NoError(suite.writer.Close())
|
|
|
|
suite.Require().Equal([]byte("abcdefghijklmno"), <-combinedCh)
|
|
}
|
|
|
|
func (suite *StreamChunkerSuite) TestStreamingSmallBuf() {
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
|
defer ctxCancel()
|
|
|
|
chunker := stream.NewChunker(ctx, suite.reader, stream.Size(1))
|
|
|
|
chunksCh := chunker.Read()
|
|
combinedCh := collectChunks(chunksCh)
|
|
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("abc"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("def"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("ghi"))
|
|
time.Sleep(50 * time.Millisecond)
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("jkl"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("mno"))
|
|
|
|
suite.Require().NoError(suite.writer.Close())
|
|
|
|
suite.Require().Equal([]byte("abcdefghijklmno"), <-combinedCh)
|
|
}
|
|
|
|
func (suite *StreamChunkerSuite) TestStreamingCancel() {
|
|
ctx, ctxCancel := context.WithCancel(context.Background())
|
|
defer ctxCancel()
|
|
|
|
chunker := stream.NewChunker(ctx, suite.reader)
|
|
|
|
chunksCh := chunker.Read()
|
|
combinedCh := collectChunks(chunksCh)
|
|
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("abc"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("def"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("ghi"))
|
|
time.Sleep(50 * time.Millisecond)
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("jkl"))
|
|
//nolint:errcheck
|
|
suite.writer.Write([]byte("mno"))
|
|
time.Sleep(50 * time.Millisecond)
|
|
|
|
ctxCancel()
|
|
|
|
suite.Require().Equal([]byte("abcdefghijklmno"), <-combinedCh)
|
|
}
|
|
|
|
func TestStreamChunkerSuite(t *testing.T) {
|
|
suite.Run(t, new(StreamChunkerSuite))
|
|
}
|