mirror of
https://github.com/siderolabs/omni.git
synced 2025-08-09 11:06:59 +02:00
Some checks are pending
default / default (push) Waiting to run
default / e2e-backups (push) Blocked by required conditions
default / e2e-forced-removal (push) Blocked by required conditions
default / e2e-scaling (push) Blocked by required conditions
default / e2e-short (push) Blocked by required conditions
default / e2e-short-secureboot (push) Blocked by required conditions
default / e2e-templates (push) Blocked by required conditions
default / e2e-upgrades (push) Blocked by required conditions
default / e2e-workload-proxy (push) Blocked by required conditions
This PR brings back a small part of the changes introduced in https://github.com/siderolabs/omni/pull/929, but in a simpler way. - Copy and paste/import from the `go-loadbalancer` library the required structures of probing a set of upstreams and picking a healthy one. - Replace the "full" TCP load balancers in the workload proxy handler with these simple upstream-probers (still call them `LB`). - Remove the in-memory transport from the workload proxy completely - instead, use a smart dial function which picks a healthy upstream for the given cluster+alias. This should result in a non-leaky, simpler and more robust proxying compared to the current implementation. Leave out the other changes for now, as they need further evaluation/testing, particularly the aggregated/stopped health checks. Signed-off-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
132 lines
3.4 KiB
Go
132 lines
3.4 KiB
Go
// Copyright (c) 2025 Sidero Labs, Inc.
|
|
//
|
|
// Use of this software is governed by the Business Source License
|
|
// included in the LICENSE file.
|
|
|
|
// Package lb contains the logic of running health checks against a set of upstreams and picking a healthy one when requested.
|
|
// It takes this logic and the primitives from https://github.com/siderolabs/go-loadbalancer as-is.
|
|
package lb
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
"slices"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/siderolabs/gen/xslices"
|
|
"github.com/siderolabs/go-loadbalancer/upstream"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// LB is a minimal wrapper around [upstream.List] which provides a simpler interface for picking an upstream address.
|
|
type LB struct {
|
|
logger *zap.Logger
|
|
upstreams *upstream.List[node]
|
|
}
|
|
|
|
// New creates a new load balancer with the given upstream addresses and logger.
|
|
func New(upstreamAddresses []string, logger *zap.Logger, options ...upstream.ListOption) (*LB, error) {
|
|
nodes := slices.Values(xslices.Map(upstreamAddresses, func(addr string) node {
|
|
return node{address: addr, logger: logger}
|
|
}))
|
|
nodesEqual := func(a, b node) bool { return a.address == b.address }
|
|
|
|
upstreams, err := upstream.NewListWithCmp(nodes, nodesEqual, options...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &LB{
|
|
logger: logger,
|
|
upstreams: upstreams,
|
|
}, nil
|
|
}
|
|
|
|
// Reconcile updates the list of upstream addresses in the load balancer.
|
|
func (lb *LB) Reconcile(upstreamAddresses []string) {
|
|
nodes := slices.Values(xslices.Map(upstreamAddresses, func(addr string) node {
|
|
return node{address: addr, logger: lb.logger}
|
|
}))
|
|
|
|
lb.upstreams.Reconcile(nodes)
|
|
}
|
|
|
|
// PickAddress picks a healthy upstream address from the load balancer.
|
|
func (lb *LB) PickAddress() (string, error) {
|
|
pickedNode, err := lb.upstreams.Pick()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return pickedNode.address, nil
|
|
}
|
|
|
|
// Shutdown shuts down the load balancer and its upstream health checks.
|
|
func (lb *LB) Shutdown() {
|
|
lb.upstreams.Shutdown()
|
|
}
|
|
|
|
// node is an implementation of an [upstream.Backend]. Taken as-is from the upstream package.
|
|
type node struct {
|
|
logger *zap.Logger
|
|
address string // host:port
|
|
}
|
|
|
|
func (upstream node) HealthCheck(ctx context.Context) (upstream.Tier, error) {
|
|
start := time.Now()
|
|
err := upstream.healthCheck(ctx)
|
|
elapsed := time.Since(start)
|
|
|
|
return calcTier(err, elapsed)
|
|
}
|
|
|
|
func (upstream node) healthCheck(ctx context.Context) error {
|
|
d := probeDialer()
|
|
|
|
c, err := d.DialContext(ctx, "tcp", upstream.address)
|
|
if err != nil {
|
|
upstream.logger.Warn("healthcheck failed", zap.String("address", upstream.address), zap.Error(err))
|
|
|
|
return err
|
|
}
|
|
|
|
return c.Close()
|
|
}
|
|
|
|
var mins = []time.Duration{
|
|
0,
|
|
time.Millisecond / 10,
|
|
time.Millisecond,
|
|
10 * time.Millisecond,
|
|
100 * time.Millisecond,
|
|
1 * time.Second,
|
|
}
|
|
|
|
func calcTier(err error, elapsed time.Duration) (upstream.Tier, error) {
|
|
if err != nil {
|
|
// preserve old tier
|
|
return -1, err
|
|
}
|
|
|
|
for i := len(mins) - 1; i >= 0; i-- {
|
|
if elapsed >= mins[i] {
|
|
return upstream.Tier(i), nil
|
|
}
|
|
}
|
|
|
|
// We should never get here, but there is no way to tell this to Go compiler.
|
|
return upstream.Tier(len(mins)), err
|
|
}
|
|
|
|
func probeDialer() *net.Dialer {
|
|
return &net.Dialer{
|
|
// The dialer reduces the TIME-WAIT period to 1 seconds instead of the OS default of 60 seconds.
|
|
Control: func(_, _ string, c syscall.RawConn) error {
|
|
return c.Control(func(fd uintptr) {
|
|
syscall.SetsockoptLinger(int(fd), syscall.SOL_SOCKET, syscall.SO_LINGER, &syscall.Linger{Onoff: 1, Linger: 1}) //nolint: errcheck
|
|
})
|
|
},
|
|
}
|
|
}
|