mirror of
				https://github.com/traefik/traefik.git
				synced 2025-10-31 00:11:38 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			72 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			72 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package aggregator
 | |
| 
 | |
| import (
 | |
| 	"github.com/traefik/traefik/v2/pkg/config/dynamic"
 | |
| )
 | |
| 
 | |
| // RingChannel implements a channel in a way that never blocks the writer.
 | |
| // Specifically, if a value is written to a RingChannel when its buffer is full then the oldest
 | |
| // value in the buffer is discarded to make room (just like a standard ring-buffer).
 | |
| // Note that Go's scheduler can cause discarded values when they could be avoided, simply by scheduling
 | |
| // the writer before the reader, so caveat emptor.
 | |
| type RingChannel struct {
 | |
| 	input, output chan dynamic.Message
 | |
| 	buffer        *dynamic.Message
 | |
| }
 | |
| 
 | |
| func newRingChannel() *RingChannel {
 | |
| 	ch := &RingChannel{
 | |
| 		input:  make(chan dynamic.Message),
 | |
| 		output: make(chan dynamic.Message),
 | |
| 	}
 | |
| 	go ch.ringBuffer()
 | |
| 	return ch
 | |
| }
 | |
| 
 | |
| func (ch *RingChannel) in() chan<- dynamic.Message {
 | |
| 	return ch.input
 | |
| }
 | |
| 
 | |
| func (ch *RingChannel) out() <-chan dynamic.Message {
 | |
| 	return ch.output
 | |
| }
 | |
| 
 | |
| // for all buffered cases.
 | |
| func (ch *RingChannel) ringBuffer() {
 | |
| 	var input, output chan dynamic.Message
 | |
| 	var next dynamic.Message
 | |
| 	input = ch.input
 | |
| 
 | |
| 	for input != nil || output != nil {
 | |
| 		select {
 | |
| 		// Prefer to write if possible, which is surprisingly effective in reducing
 | |
| 		// dropped elements due to overflow. The naive read/write select chooses randomly
 | |
| 		// when both channels are ready, which produces unnecessary drops 50% of the time.
 | |
| 		case output <- next:
 | |
| 			ch.buffer = nil
 | |
| 		default:
 | |
| 			select {
 | |
| 			case elem, open := <-input:
 | |
| 				if !open {
 | |
| 					input = nil
 | |
| 					break
 | |
| 				}
 | |
| 
 | |
| 				ch.buffer = &elem
 | |
| 			case output <- next:
 | |
| 				ch.buffer = nil
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if ch.buffer == nil {
 | |
| 			output = nil
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		output = ch.output
 | |
| 		next = *ch.buffer
 | |
| 	}
 | |
| 
 | |
| 	close(ch.output)
 | |
| }
 |