Andrey Smirnov 204873e257
refactor: fix filechunker not exiting on context cancel (#668)
This started as a simple unit-test for file chunker, but the first test
hung immediately, so I started looking into the code.

One problem was that when entering inotify() code, ctx cancel wasn't
considered. Another problem is that remove fsnotify was never triggered,
but I saw that with unit-test later.

Small nit was that inotify() was initialized every time we got to EOF,
which is not efficient for "follow" mode.

So I moved inotify into the main loop, and plugged context cancel watch
into the place when chunk is delivered. Chunker code is supposed to
block in two places: when it tries to deliver next chunk (as client
might be slow to recieve buffers) or when there's no new data (on
inotify). So it makes sense to assert context canceled condition in both
cases.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
2019-05-20 18:00:40 +03:00

155 lines
3.2 KiB
Go

/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package file
import (
"context"
"io"
"log"
"os"
"path/filepath"
"github.com/talos-systems/talos/internal/pkg/chunker"
"gopkg.in/fsnotify.v1"
)
// Options is the functional options struct.
type Options struct {
Size int
}
// Option is the functional option func.
type Option func(*Options)
// Size sets the chunk size of the Chunker.
func Size(s int) Option {
return func(args *Options) {
args.Size = s
}
}
// File is a conecrete type that implements the chunker.Chunker interface.
type File struct {
source Source
options *Options
}
// Source is an interface describing the source of a File.
type Source = *os.File
// NewChunker initializes a Chunker with default values.
func NewChunker(source Source, setters ...Option) chunker.Chunker {
opts := &Options{
Size: 1024,
}
for _, setter := range setters {
setter(opts)
}
return &File{
source,
opts,
}
}
// Read implements ChunkReader.
//
// nolint: gocyclo
func (c *File) Read(ctx context.Context) <-chan []byte {
// Create a buffered channel of length 1.
ch := make(chan []byte, 1)
filename := c.source.Name()
go func(ch chan []byte) {
defer close(ch)
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("failed to watch: %v\n", err)
return
}
// nolint: errcheck
defer watcher.Close()
if err = watcher.Add(filepath.Dir(filename)); err != nil {
log.Printf("failed to watch add: %v\n", err)
return
}
offset, err := c.source.Seek(0, io.SeekStart)
if err != nil {
log.Printf("failed to seek: %v\n", err)
return
}
buf := make([]byte, c.options.Size)
for {
for {
n, err := c.source.ReadAt(buf, offset)
if err != nil && err != io.EOF {
log.Printf("read error: %s\n", err.Error())
return
}
offset += int64(n)
if n > 0 {
// Copy the buffer since we will modify it in the next loop.
b := make([]byte, n)
copy(b, buf[:n])
DELIVER:
select {
case <-ctx.Done():
return
case event := <-watcher.Events:
// drain events while waiting for the buffer to be delivered
// otherwise inotify() queue might overflow
if event.Name == filename && event.Op == fsnotify.Write {
// clear EOF condition (if there was one) to make sure
// we read more data
err = nil
}
goto DELIVER
case ch <- b:
}
}
if err == io.EOF || n == 0 {
break
}
}
WATCH:
select {
case <-ctx.Done():
return
case event := <-watcher.Events:
if event.Name != filename {
// ignore events for other files
goto WATCH
}
switch event.Op {
case fsnotify.Write:
// new data, run one more loop copying data back to the client
case fsnotify.Remove:
log.Printf("file was removed while watching: %s", filename)
return
default:
log.Printf("ignoring fsnotify event: %v\n", event)
goto WATCH
}
case err := <-watcher.Errors:
log.Printf("failed to watch: %v\n", err)
return
}
}
}(ch)
return ch
}