talos/pkg/chunker/stream/stream_test.go
Andrey Smirnov c7b25430bb
fix: multiple small fixes for service runners
It all started debugging the issue on Talos being stuck on reboot when
`talosctl logs -f kubelet` is being used.

Fixes:

* abort goroutine runner even if the goroutine doesn't terminate - we
  have no way to force termination, so at least don't hang forever
* align timeouts for apid/trustd for graceful termination - so that at
  least the service is not SIGKILLed while it does its own graceful
  shutdown
* in stream chunker, act on canceled context immediately instead of
  relying on `Read` to return: with `logs -f` the reader will block
  forever waiting for new logs

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
2024-11-29 17:43:03 +04:00

132 lines
3.0 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package stream_test
import (
"context"
"io"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/siderolabs/talos/pkg/chunker/stream"
)
type StreamChunkerSuite struct {
suite.Suite
reader *io.PipeReader
writer *io.PipeWriter
}
func (suite *StreamChunkerSuite) SetupTest() {
suite.reader, suite.writer = io.Pipe()
}
func (suite *StreamChunkerSuite) TearDownTest() {
suite.Require().NoError(suite.writer.Close())
suite.Require().NoError(suite.reader.Close())
}
func collectChunks(chunksCh <-chan []byte) <-chan []byte {
combinedCh := make(chan []byte)
go func() {
res := []byte(nil)
for chunk := range chunksCh {
res = append(res, chunk...)
}
combinedCh <- res
}()
return combinedCh
}
func (suite *StreamChunkerSuite) TestStreaming() {
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
chunker := stream.NewChunker(ctx, suite.reader)
chunksCh := chunker.Read()
combinedCh := collectChunks(chunksCh)
//nolint:errcheck
suite.writer.Write([]byte("abc"))
//nolint:errcheck
suite.writer.Write([]byte("def"))
//nolint:errcheck
suite.writer.Write([]byte("ghi"))
time.Sleep(50 * time.Millisecond)
//nolint:errcheck
suite.writer.Write([]byte("jkl"))
//nolint:errcheck
suite.writer.Write([]byte("mno"))
suite.Require().NoError(suite.writer.Close())
suite.Require().Equal([]byte("abcdefghijklmno"), <-combinedCh)
}
func (suite *StreamChunkerSuite) TestStreamingSmallBuf() {
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
chunker := stream.NewChunker(ctx, suite.reader, stream.Size(1))
chunksCh := chunker.Read()
combinedCh := collectChunks(chunksCh)
//nolint:errcheck
suite.writer.Write([]byte("abc"))
//nolint:errcheck
suite.writer.Write([]byte("def"))
//nolint:errcheck
suite.writer.Write([]byte("ghi"))
time.Sleep(50 * time.Millisecond)
//nolint:errcheck
suite.writer.Write([]byte("jkl"))
//nolint:errcheck
suite.writer.Write([]byte("mno"))
suite.Require().NoError(suite.writer.Close())
suite.Require().Equal([]byte("abcdefghijklmno"), <-combinedCh)
}
func (suite *StreamChunkerSuite) TestStreamingCancel() {
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
chunker := stream.NewChunker(ctx, suite.reader)
chunksCh := chunker.Read()
combinedCh := collectChunks(chunksCh)
//nolint:errcheck
suite.writer.Write([]byte("abc"))
//nolint:errcheck
suite.writer.Write([]byte("def"))
//nolint:errcheck
suite.writer.Write([]byte("ghi"))
time.Sleep(50 * time.Millisecond)
//nolint:errcheck
suite.writer.Write([]byte("jkl"))
//nolint:errcheck
suite.writer.Write([]byte("mno"))
time.Sleep(50 * time.Millisecond)
ctxCancel()
suite.Require().Equal([]byte("abcdefghijklmno"), <-combinedCh)
}
func TestStreamChunkerSuite(t *testing.T) {
suite.Run(t, new(StreamChunkerSuite))
}