mirror of
				https://github.com/traefik/traefik.git
				synced 2025-11-03 18:01:31 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			72 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			72 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package aggregator
 | 
						|
 | 
						|
import (
 | 
						|
	"github.com/traefik/traefik/v2/pkg/config/dynamic"
 | 
						|
)
 | 
						|
 | 
						|
// RingChannel implements a channel in a way that never blocks the writer.
 | 
						|
// Specifically, if a value is written to a RingChannel when its buffer is full then the oldest
 | 
						|
// value in the buffer is discarded to make room (just like a standard ring-buffer).
 | 
						|
// Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling
 | 
						|
// the writer before the reader, so caveat emptor.
 | 
						|
type RingChannel struct {
 | 
						|
	input, output chan dynamic.Message
 | 
						|
	buffer        *dynamic.Message
 | 
						|
}
 | 
						|
 | 
						|
func newRingChannel() *RingChannel {
 | 
						|
	ch := &RingChannel{
 | 
						|
		input:  make(chan dynamic.Message),
 | 
						|
		output: make(chan dynamic.Message),
 | 
						|
	}
 | 
						|
	go ch.ringBuffer()
 | 
						|
	return ch
 | 
						|
}
 | 
						|
 | 
						|
func (ch *RingChannel) in() chan<- dynamic.Message {
 | 
						|
	return ch.input
 | 
						|
}
 | 
						|
 | 
						|
func (ch *RingChannel) out() <-chan dynamic.Message {
 | 
						|
	return ch.output
 | 
						|
}
 | 
						|
 | 
						|
// for all buffered cases.
 | 
						|
func (ch *RingChannel) ringBuffer() {
 | 
						|
	var input, output chan dynamic.Message
 | 
						|
	var next dynamic.Message
 | 
						|
	input = ch.input
 | 
						|
 | 
						|
	for input != nil || output != nil {
 | 
						|
		select {
 | 
						|
		// Prefer to write if possible, which is surprisingly effective in reducing
 | 
						|
		// dropped elements due to overflow. The naive read/write select chooses randomly
 | 
						|
		// when both channels are ready, which produces unnecessary drops 50% of the time.
 | 
						|
		case output <- next:
 | 
						|
			ch.buffer = nil
 | 
						|
		default:
 | 
						|
			select {
 | 
						|
			case elem, open := <-input:
 | 
						|
				if !open {
 | 
						|
					input = nil
 | 
						|
					break
 | 
						|
				}
 | 
						|
 | 
						|
				ch.buffer = &elem
 | 
						|
			case output <- next:
 | 
						|
				ch.buffer = nil
 | 
						|
			}
 | 
						|
		}
 | 
						|
 | 
						|
		if ch.buffer == nil {
 | 
						|
			output = nil
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		output = ch.output
 | 
						|
		next = *ch.buffer
 | 
						|
	}
 | 
						|
 | 
						|
	close(ch.output)
 | 
						|
}
 |