From db5815fb978db0873752618d4531ee2ac9f5f83d Mon Sep 17 00:00:00 2001 From: "M. J. Fromberger" Date: Tue, 28 Oct 2025 08:45:22 -0700 Subject: [PATCH] Revert "logtail: avoid racing eventbus subscriptions with Shutdown (#17639)" (#17684) This reverts commit 4346615d77a6de16854c6e78f9d49375d6424e6e. We averted the shutdown race, but will need to service the subscriber even when we are not waiting for a change so that we do not delay the bus as a whole. Updates #17638 Change-Id: I5488466ed83f5ad1141c95267f5ae54878a24657 Signed-off-by: M. J. Fromberger --- logtail/logtail.go | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/logtail/logtail.go b/logtail/logtail.go index 52823fedf..675422890 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -124,7 +124,6 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { if cfg.Bus != nil { l.eventClient = cfg.Bus.Client("logtail.Logger") - l.changeDeltaSub = eventbus.Subscribe[netmon.ChangeDelta](l.eventClient) } l.SetSockstatsLabel(sockstats.LabelLogtailLogger) l.compressLogs = cfg.CompressLogs @@ -163,7 +162,6 @@ type Logger struct { httpDoCalls atomic.Int32 sockstatsLabel atomicSocktatsLabel eventClient *eventbus.Client - changeDeltaSub *eventbus.Subscriber[netmon.ChangeDelta] procID uint32 includeProcSequence bool @@ -429,23 +427,8 @@ func (l *Logger) internetUp() bool { func (l *Logger) awaitInternetUp(ctx context.Context) { if l.eventClient != nil { - for { - if l.internetUp() { - return - } - select { - case <-ctx.Done(): - return // give up - case <-l.changeDeltaSub.Done(): - return // give up (closing down) - case delta := <-l.changeDeltaSub.Events(): - if delta.New.AnyInterfaceUp() || l.internetUp() { - fmt.Fprintf(l.stderr, "logtail: internet back up\n") - return - } - fmt.Fprintf(l.stderr, "logtail: network changed, but is not up") - } - } + l.awaitInternetUpBus(ctx) + return } upc := make(chan bool, 1) defer l.netMonitor.RegisterChangeCallback(func(delta *netmon.ChangeDelta) { @@ -466,6 +449,24 @@ func (l *Logger) awaitInternetUp(ctx context.Context) { } } +func (l *Logger) awaitInternetUpBus(ctx context.Context) { + if l.internetUp() { + return + } + sub := eventbus.Subscribe[netmon.ChangeDelta](l.eventClient) + defer sub.Close() + select { + case delta := <-sub.Events(): + if delta.New.AnyInterfaceUp() { + fmt.Fprintf(l.stderr, "logtail: internet back up\n") + return + } + fmt.Fprintf(l.stderr, "logtail: network changed, but is not up") + case <-ctx.Done(): + return + } +} + // upload uploads body to the log server. // origlen indicates the pre-compression body length. // origlen of -1 indicates that the body is not compressed.