Andrey Smirnov 0454130ad9
feat: suppress controller runtime first N failures on the console
As the controllers might fail with transient errors on machine startup,
but errors are always retried, persisten errors will anyway show up in
the console.

The full `talosctl logs controller-runtime` are not suppressed.

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
2024-07-05 15:36:54 +04:00

527 lines
12 KiB
Go

// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Package ntp provides a time sync client via SNTP protocol.
package ntp
import (
"bytes"
"context"
"fmt"
"math/bits"
"net"
"os"
"slices"
"strings"
"sync"
"time"
"github.com/beevik/ntp"
"github.com/u-root/u-root/pkg/rtc"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/unix"
"github.com/siderolabs/talos/internal/pkg/ntp/internal/spike"
"github.com/siderolabs/talos/internal/pkg/timex"
)
// Syncer performs time sync via NTP on schedule.
type Syncer struct {
logger *zap.Logger
timeServersMu sync.Mutex
timeServers []string
lastSyncServer string
timeSyncNotified bool
timeSynced chan struct{}
restartSyncCh chan struct{}
epochChangeCh chan struct{}
firstSync bool
spikeDetector spike.Detector
MinPoll, MaxPoll, RetryPoll time.Duration
// these functions are overridden in tests for mocking support
CurrentTime CurrentTimeFunc
NTPQuery QueryFunc
AdjustTime AdjustTimeFunc
}
// Measurement is a struct containing correction data based on a time request.
type Measurement struct {
ClockOffset time.Duration
Leap ntp.LeapIndicator
Spike bool
}
// NewSyncer creates new Syncer with default configuration.
func NewSyncer(logger *zap.Logger, timeServers []string) *Syncer {
syncer := &Syncer{
logger: logger,
timeServers: slices.Clone(timeServers),
timeSynced: make(chan struct{}),
restartSyncCh: make(chan struct{}, 1),
epochChangeCh: make(chan struct{}, 1),
firstSync: true,
spikeDetector: spike.Detector{},
MinPoll: MinAllowablePoll,
MaxPoll: MaxAllowablePoll,
RetryPoll: RetryPoll,
CurrentTime: time.Now,
NTPQuery: ntp.Query,
AdjustTime: timex.Adjtimex,
}
return syncer
}
// Synced returns a channel which is closed when time is in sync.
func (syncer *Syncer) Synced() <-chan struct{} {
return syncer.timeSynced
}
// EpochChange returns a channel which receives a value each time jumps more than EpochLimit.
func (syncer *Syncer) EpochChange() <-chan struct{} {
return syncer.epochChangeCh
}
func (syncer *Syncer) getTimeServers() []string {
syncer.timeServersMu.Lock()
defer syncer.timeServersMu.Unlock()
return syncer.timeServers
}
func (syncer *Syncer) getLastSyncServer() string {
syncer.timeServersMu.Lock()
defer syncer.timeServersMu.Unlock()
return syncer.lastSyncServer
}
func (syncer *Syncer) setLastSyncServer(lastSyncServer string) {
syncer.timeServersMu.Lock()
defer syncer.timeServersMu.Unlock()
syncer.lastSyncServer = lastSyncServer
}
// SetTimeServers sets the list of time servers to use.
func (syncer *Syncer) SetTimeServers(timeServers []string) {
syncer.timeServersMu.Lock()
defer syncer.timeServersMu.Unlock()
if slices.Equal(timeServers, syncer.timeServers) {
return
}
syncer.timeServers = slices.Clone(timeServers)
syncer.lastSyncServer = ""
syncer.restartSync()
}
func (syncer *Syncer) restartSync() {
select {
case syncer.restartSyncCh <- struct{}{}:
default:
}
}
func absDuration(d time.Duration) time.Duration {
if d < 0 {
return -d
}
return d
}
func (syncer *Syncer) isSpike(resp *ntp.Response) bool {
return syncer.spikeDetector.IsSpike(spike.SampleFromNTPResponse(resp))
}
// Run runs the sync process.
//
// Run is usually run in a goroutine.
// When context is canceled, sync process aborts.
//
//nolint:gocyclo,cyclop
func (syncer *Syncer) Run(ctx context.Context) {
RTCClockInitialize.Do(func() {
var err error
RTCClock, err = rtc.OpenRTC()
if err != nil {
syncer.logger.Error("failure opening RTC, ignored", zap.Error(err))
}
})
pollInterval := time.Duration(0)
for {
lastSyncServer, resp, err := syncer.query(ctx)
if err != nil {
return
}
spike := false
if resp != nil {
spike = resp.Spike
}
switch {
case resp == nil:
// if no response was ever received, consider doing short sleep to retry sooner as it's not Kiss-o-Death response
pollInterval = syncer.RetryPoll
case pollInterval == 0:
// first sync
pollInterval = syncer.MinPoll
case !spike && absDuration(resp.ClockOffset) > ExpectedAccuracy:
// huge offset, retry sync with minimum interval
pollInterval = syncer.MinPoll
case absDuration(resp.ClockOffset) < ExpectedAccuracy*25/100: // *0.25
// clock offset is within 25% of expected accuracy, increase poll interval
if pollInterval < syncer.MaxPoll {
pollInterval *= 2
}
case spike || absDuration(resp.ClockOffset) > ExpectedAccuracy*75/100: // *0.75
// spike was detected or clock offset is too large, decrease poll interval
if pollInterval > syncer.MinPoll {
pollInterval /= 2
}
}
if resp != nil && pollInterval < syncer.MinPoll {
// set poll interval to at least min poll if there was any response
pollInterval = syncer.MinPoll
}
syncer.logger.Debug("sample stats",
zap.Duration("jitter", time.Duration(syncer.spikeDetector.Jitter()*float64(time.Second))),
zap.Duration("poll_interval", pollInterval),
zap.Bool("spike", spike),
zap.Bool("resp_exists", resp != nil),
)
if resp != nil && !spike {
err = syncer.adjustTime(resp.ClockOffset, resp.Leap, lastSyncServer, pollInterval)
if err == nil {
if !syncer.timeSyncNotified {
// successful first time sync, notify about it
close(syncer.timeSynced)
syncer.timeSyncNotified = true
}
} else {
syncer.logger.Error("error adjusting time", zap.Error(err))
}
}
select {
case <-ctx.Done():
return
case <-syncer.restartSyncCh:
// time servers got changed, restart the loop immediately
case <-time.After(pollInterval):
}
}
}
func (syncer *Syncer) query(ctx context.Context) (lastSyncServer string, measurement *Measurement, err error) {
lastSyncServer = syncer.getLastSyncServer()
failedServer := ""
if lastSyncServer != "" {
measurement, err = syncer.queryServer(lastSyncServer)
if err != nil {
syncer.logger.Error(fmt.Sprintf("time query error with server %q", lastSyncServer), zap.Error(err))
failedServer = lastSyncServer
lastSyncServer = ""
err = nil
}
}
if lastSyncServer == "" {
var serverList []string
serverList, err = syncer.resolveServers(ctx)
if err != nil {
return lastSyncServer, measurement, err
}
for _, server := range serverList {
if server == failedServer {
// skip server which failed in previous sync to avoid sending requests with short interval
continue
}
select {
case <-ctx.Done():
return lastSyncServer, measurement, ctx.Err()
case <-syncer.restartSyncCh:
return lastSyncServer, measurement, nil
default:
}
measurement, err = syncer.queryServer(server)
if err != nil {
syncer.logger.Error(fmt.Sprintf("time query error with server %q", server), zap.Error(err))
err = nil
} else {
syncer.setLastSyncServer(server)
lastSyncServer = server
break
}
}
}
return lastSyncServer, measurement, err
}
func (syncer *Syncer) isPTPDevice(server string) bool {
return strings.HasPrefix(server, "/dev/")
}
func (syncer *Syncer) resolveServers(ctx context.Context) ([]string, error) {
var serverList []string
for _, server := range syncer.getTimeServers() {
if syncer.isPTPDevice(server) {
serverList = append(serverList, server)
} else {
ips, err := net.LookupIP(server)
if err != nil {
syncer.logger.Error(fmt.Sprintf("failed looking up %q, ignored", server), zap.Error(err))
}
for _, ip := range ips {
serverList = append(serverList, ip.String())
}
}
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
}
return serverList, nil
}
func (syncer *Syncer) queryServer(server string) (*Measurement, error) {
if syncer.isPTPDevice(server) {
return syncer.queryPTP(server)
}
return syncer.queryNTP(server)
}
func (syncer *Syncer) queryPTP(server string) (*Measurement, error) {
phc, err := os.Open(server)
if err != nil {
return nil, err
}
defer phc.Close() //nolint:errcheck
// From clock_gettime(2):
//
// Using the appropriate macros, open file descriptors may be converted into clock IDs and passed to clock_gettime(), clock_settime(), and clock_adjtime(2). The
// following example shows how to convert a file descriptor into a dynamic clock ID.
//
// #define CLOCKFD 3
// #define FD_TO_CLOCKID(fd) ((~(clockid_t) (fd) << 3) | CLOCKFD)
clockid := int32(3 | (^phc.Fd() << 3))
var ts unix.Timespec
err = unix.ClockGettime(clockid, &ts)
if err != nil {
return nil, err
}
offset := time.Until(time.Unix(ts.Sec, ts.Nsec))
syncer.logger.Debug("PTP clock",
zap.Duration("clock_offset", offset),
zap.Int64("sec", ts.Sec),
zap.Int64("nsec", ts.Nsec),
zap.String("device", server),
)
meas := &Measurement{
ClockOffset: offset,
Leap: 0,
Spike: false,
}
return meas, err
}
func (syncer *Syncer) queryNTP(server string) (*Measurement, error) {
resp, err := syncer.NTPQuery(server)
if err != nil {
return nil, err
}
syncer.logger.Debug("NTP response",
zap.Duration("clock_offset", resp.ClockOffset),
zap.Duration("rtt", resp.RTT),
zap.Uint8("leap", uint8(resp.Leap)),
zap.Uint8("stratum", resp.Stratum),
zap.Duration("precision", resp.Precision),
zap.Duration("root_delay", resp.RootDelay),
zap.Duration("root_dispersion", resp.RootDispersion),
zap.Duration("root_distance", resp.RootDistance),
)
validationError := resp.Validate()
measurement := &Measurement{
ClockOffset: resp.ClockOffset,
Leap: resp.Leap,
Spike: false,
}
if validationError == nil {
measurement.Spike = syncer.isSpike(resp)
}
return measurement, validationError
}
// log2i returns 0 for v == 0 and v == 1.
func log2i(v uint64) int {
if v == 0 {
return 0
}
return 63 - bits.LeadingZeros64(v)
}
// adjustTime adds an offset to the current time.
//
//nolint:gocyclo
func (syncer *Syncer) adjustTime(offset time.Duration, leapSecond ntp.LeapIndicator, server string, nextPollInterval time.Duration) error {
var (
buf bytes.Buffer
req unix.Timex
jump bool
)
if offset < -AdjustTimeLimit || offset > AdjustTimeLimit {
jump = true
fmt.Fprintf(&buf, "adjusting time (jump) by %s via %s", offset, server)
req = unix.Timex{
Modes: unix.ADJ_SETOFFSET | unix.ADJ_NANO | unix.ADJ_STATUS | unix.ADJ_MAXERROR | unix.ADJ_ESTERROR,
Time: unix.Timeval{
Sec: int64(offset / time.Second),
Usec: int64(offset / time.Nanosecond % time.Second),
},
Maxerror: 0,
Esterror: 0,
}
// kernel wants tv_usec to be positive
if req.Time.Usec < 0 {
req.Time.Sec--
req.Time.Usec += int64(time.Second / time.Nanosecond)
}
} else {
fmt.Fprintf(&buf, "adjusting time (slew) by %s via %s", offset, server)
pollSeconds := uint64(nextPollInterval / time.Second)
log2iPollSeconds := log2i(pollSeconds)
req = unix.Timex{
Modes: unix.ADJ_OFFSET | unix.ADJ_NANO | unix.ADJ_STATUS | unix.ADJ_TIMECONST | unix.ADJ_MAXERROR | unix.ADJ_ESTERROR,
Offset: int64(offset / time.Nanosecond),
Status: unix.STA_PLL,
Maxerror: 0,
Esterror: 0,
Constant: int64(log2iPollSeconds) - 4,
}
}
switch leapSecond { //nolint:exhaustive
case ntp.LeapAddSecond:
req.Status |= unix.STA_INS
case ntp.LeapDelSecond:
req.Status |= unix.STA_DEL
}
logLevel := zapcore.DebugLevel
if jump {
logLevel = zapcore.InfoLevel
}
state, err := syncer.AdjustTime(&req)
fmt.Fprintf(&buf, ", state %s, status %s", state, timex.Status(req.Status))
if err != nil {
logLevel = zapcore.WarnLevel
fmt.Fprintf(&buf, ", error was %s", err)
}
if syncer.firstSync && logLevel == zapcore.DebugLevel {
// promote first sync to info level
syncer.firstSync = false
logLevel = zapcore.InfoLevel
}
if ce := syncer.logger.Check(logLevel, buf.String()); ce != nil {
ce.Write()
}
syncer.logger.Debug("adjtime state",
zap.Int64("constant", req.Constant),
zap.Duration("offset", time.Duration(req.Offset)),
zap.Int64("freq_offset", req.Freq),
zap.Int64("freq_offset_ppm", req.Freq/65536),
)
if err == nil {
if offset < -EpochLimit || offset > EpochLimit {
// notify about epoch change
select {
case syncer.epochChangeCh <- struct{}{}:
default:
}
}
if jump {
if RTCClock != nil {
if rtcErr := RTCClock.Set(time.Now().Add(offset)); rtcErr != nil {
syncer.logger.Error("error syncing RTC", zap.Error(rtcErr))
} else {
syncer.logger.Info("synchronized RTC with system clock")
}
}
}
}
return err
}