From 6aa10576c902b0930c72f0c0113bb6e8636cfd1e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 13 Apr 2026 21:18:32 +0000 Subject: [PATCH] wgengine/magicsock: deflake TestTwoDevicePing compare-metrics-stats The compare-metrics-stats subtest reset two independent counting systems (physical connection counters and expvar.Int user metrics) non-atomically. Background WireGuard keepalives arriving between the resets could increment one system but not the other, causing off-by-one packet/byte mismatches in either direction. Replace the reset-then-compare pattern with snapshot-and-delta: snapshot both systems before pings, snapshot again after, and compare the deltas. This eliminates the non-atomic reset window entirely. As a belt-and-suspenders safety net, tolerate a difference of exactly one packet (and corresponding bytes) from a stray keepalive that could still arrive in the narrow window between the two snapshots. flakestress passes with ~5900 runs (~2800 without -race, ~3100 with -race) but it also passed previously too. This is an annoying one to repro. Fixes #11762 Change-Id: I3447ad67e71c8146e85eed38b7a665033ef9e284 Signed-off-by: Brad Fitzpatrick --- wgengine/magicsock/magicsock_test.go | 191 ++++++++++++++++----------- 1 file changed, 113 insertions(+), 78 deletions(-) diff --git a/wgengine/magicsock/magicsock_test.go b/wgengine/magicsock/magicsock_test.go index b7492b867..412c5cf71 100644 --- a/wgengine/magicsock/magicsock_test.go +++ b/wgengine/magicsock/magicsock_test.go @@ -39,7 +39,6 @@ import ( "go4.org/mem" "golang.org/x/net/icmp" "golang.org/x/net/ipv4" - "tailscale.com/cmd/testwrapper/flakytest" "tailscale.com/control/controlknobs" "tailscale.com/derp/derpserver" "tailscale.com/disco" @@ -733,7 +732,6 @@ func (localhostListener) ListenPacket(ctx context.Context, network, address stri } func TestTwoDevicePing(t *testing.T) { - flakytest.Mark(t, "https://github.com/tailscale/tailscale/issues/11762") ln, ip := localhostListener{}, netaddr.IPv4(127, 0, 0, 1) n := &devices{ m1: ln, @@ -1265,96 +1263,133 @@ func testTwoDevicePing(t *testing.T, d *devices) { t.Run("compare-metrics-stats", func(t *testing.T) { setT(t) defer setT(outerT) - m1.counts.Reset() - m2.counts.Reset() - m1.conn.resetMetricsForTest() - m2.conn.resetMetricsForTest() - t.Logf("Metrics before: %s\n", m1.metrics.String()) + + // Snapshot both counting systems before pings rather than + // resetting them. Resetting two independent systems + // non-atomically left a window where background WireGuard + // keepalives could increment one system but not the other, + // causing flaky off-by-one mismatches. + physBefore1, metricBefore1 := snapshotCounts(m1) + physBefore2, metricBefore2 := snapshotCounts(m2) + ping1(t) ping2(t) - assertConnStatsAndUserMetricsEqual(t, m1) - assertConnStatsAndUserMetricsEqual(t, m2) + + assertConnStatDeltasMatchMetricDeltas(t, m1, physBefore1, metricBefore1) + assertConnStatDeltasMatchMetricDeltas(t, m2, physBefore2, metricBefore2) assertGlobalMetricsMatchPerConn(t, m1, m2) - t.Logf("Metrics after: %s\n", m1.metrics.String()) }) } -func (c *Conn) resetMetricsForTest() { - c.metrics.inboundBytesIPv4Total.Set(0) - c.metrics.inboundPacketsIPv4Total.Set(0) - c.metrics.outboundBytesIPv4Total.Set(0) - c.metrics.outboundPacketsIPv4Total.Set(0) - c.metrics.inboundBytesIPv6Total.Set(0) - c.metrics.inboundPacketsIPv6Total.Set(0) - c.metrics.outboundBytesIPv6Total.Set(0) - c.metrics.outboundPacketsIPv6Total.Set(0) - c.metrics.inboundBytesDERPTotal.Set(0) - c.metrics.inboundPacketsDERPTotal.Set(0) - c.metrics.outboundBytesDERPTotal.Set(0) - c.metrics.outboundPacketsDERPTotal.Set(0) +// countSnapshot holds a point-in-time snapshot of packet/byte statistics, +// categorized by transport type (IPv4 vs DERP). +type countSnapshot struct { + ipv4RxBytes, ipv4TxBytes int64 + ipv4RxPackets, ipv4TxPackets int64 + derpRxBytes, derpTxBytes int64 + derpRxPackets, derpTxPackets int64 } -func assertConnStatsAndUserMetricsEqual(t *testing.T, ms *magicStack) { - t.Helper() - physIPv4RxBytes := int64(0) - physIPv4TxBytes := int64(0) - physDERPRxBytes := int64(0) - physDERPTxBytes := int64(0) - physIPv4RxPackets := int64(0) - physIPv4TxPackets := int64(0) - physDERPRxPackets := int64(0) - physDERPTxPackets := int64(0) +// snapshotCounts captures the current physical connection counter values and +// user metrics for ms, returning them as separate snapshots. Reading both +// systems back-to-back (rather than resetting them non-atomically) avoids a +// race where background WireGuard keepalives could increment one system but +// not the other during a reset window. +func snapshotCounts(ms *magicStack) (phys, metric countSnapshot) { for conn, count := range ms.counts.Clone() { - t.Logf("physconn src: %s, dst: %s", conn.Src.String(), conn.Dst.String()) if conn.Dst.String() == "127.3.3.40:1" { - physDERPRxBytes += int64(count.RxBytes) - physDERPTxBytes += int64(count.TxBytes) - physDERPRxPackets += int64(count.RxPackets) - physDERPTxPackets += int64(count.TxPackets) + phys.derpRxBytes += int64(count.RxBytes) + phys.derpTxBytes += int64(count.TxBytes) + phys.derpRxPackets += int64(count.RxPackets) + phys.derpTxPackets += int64(count.TxPackets) } else { - physIPv4RxBytes += int64(count.RxBytes) - physIPv4TxBytes += int64(count.TxBytes) - physIPv4RxPackets += int64(count.RxPackets) - physIPv4TxPackets += int64(count.TxPackets) + phys.ipv4RxBytes += int64(count.RxBytes) + phys.ipv4TxBytes += int64(count.TxBytes) + phys.ipv4RxPackets += int64(count.RxPackets) + phys.ipv4TxPackets += int64(count.TxPackets) } } - - metricIPv4RxBytes := ms.conn.metrics.inboundBytesIPv4Total.Value() - metricIPv4RxPackets := ms.conn.metrics.inboundPacketsIPv4Total.Value() - metricIPv4TxBytes := ms.conn.metrics.outboundBytesIPv4Total.Value() - metricIPv4TxPackets := ms.conn.metrics.outboundPacketsIPv4Total.Value() - - metricDERPRxBytes := ms.conn.metrics.inboundBytesDERPTotal.Value() - metricDERPRxPackets := ms.conn.metrics.inboundPacketsDERPTotal.Value() - metricDERPTxBytes := ms.conn.metrics.outboundBytesDERPTotal.Value() - metricDERPTxPackets := ms.conn.metrics.outboundPacketsDERPTotal.Value() - - // Reset counts after reading all values to minimize the window where a - // background packet could increment metrics but miss the cloned counts. - ms.counts.Reset() - - // Compare physical connection stats with per-conn user metrics. - // A rebind during the measurement window can reset the physical connection - // counter, causing physical stats to show 0 while user metrics recorded - // packets normally. Tolerate this by logging instead of failing. - checkPhysVsMetric := func(phys, metric int64, name string) { - if phys == metric { - return - } - if phys == 0 && metric > 0 { - t.Logf("%s: physical counter is 0 but metric is %d (possible rebind during measurement)", name, metric) - return - } - t.Errorf("%s: physical=%d, metric=%d", name, phys, metric) + metric = countSnapshot{ + ipv4RxBytes: ms.conn.metrics.inboundBytesIPv4Total.Value(), + ipv4TxBytes: ms.conn.metrics.outboundBytesIPv4Total.Value(), + ipv4RxPackets: ms.conn.metrics.inboundPacketsIPv4Total.Value(), + ipv4TxPackets: ms.conn.metrics.outboundPacketsIPv4Total.Value(), + derpRxBytes: ms.conn.metrics.inboundBytesDERPTotal.Value(), + derpTxBytes: ms.conn.metrics.outboundBytesDERPTotal.Value(), + derpRxPackets: ms.conn.metrics.inboundPacketsDERPTotal.Value(), + derpTxPackets: ms.conn.metrics.outboundPacketsDERPTotal.Value(), + } + return phys, metric +} + +// assertConnStatDeltasMatchMetricDeltas checks that the changes in physical +// connection counters since physBefore match the changes in user metrics since +// metricBefore. Using deltas avoids a race from non-atomically resetting the +// two independent counting systems. +// +// As a safety net, a difference of exactly one packet (and the corresponding +// bytes) is tolerated, since a background WireGuard keepalive could still +// arrive in the narrow window between snapshotting the two systems. +func assertConnStatDeltasMatchMetricDeltas(t *testing.T, ms *magicStack, physBefore, metricBefore countSnapshot) { + t.Helper() + physAfter, metricAfter := snapshotCounts(ms) + + type stat struct { + name string + physDelta, metDelta int64 + isPackets bool // true for packet counts, false for byte counts + packetDeltaTolerated bool // set by packet check, used by byte check + } + + stats := []stat{ + {name: "IPv4RxPackets", physDelta: physAfter.ipv4RxPackets - physBefore.ipv4RxPackets, metDelta: metricAfter.ipv4RxPackets - metricBefore.ipv4RxPackets, isPackets: true}, + {name: "IPv4RxBytes", physDelta: physAfter.ipv4RxBytes - physBefore.ipv4RxBytes, metDelta: metricAfter.ipv4RxBytes - metricBefore.ipv4RxBytes}, + {name: "IPv4TxPackets", physDelta: physAfter.ipv4TxPackets - physBefore.ipv4TxPackets, metDelta: metricAfter.ipv4TxPackets - metricBefore.ipv4TxPackets, isPackets: true}, + {name: "IPv4TxBytes", physDelta: physAfter.ipv4TxBytes - physBefore.ipv4TxBytes, metDelta: metricAfter.ipv4TxBytes - metricBefore.ipv4TxBytes}, + {name: "DERPRxPackets", physDelta: physAfter.derpRxPackets - physBefore.derpRxPackets, metDelta: metricAfter.derpRxPackets - metricBefore.derpRxPackets, isPackets: true}, + {name: "DERPRxBytes", physDelta: physAfter.derpRxBytes - physBefore.derpRxBytes, metDelta: metricAfter.derpRxBytes - metricBefore.derpRxBytes}, + {name: "DERPTxPackets", physDelta: physAfter.derpTxPackets - physBefore.derpTxPackets, metDelta: metricAfter.derpTxPackets - metricBefore.derpTxPackets, isPackets: true}, + {name: "DERPTxBytes", physDelta: physAfter.derpTxBytes - physBefore.derpTxBytes, metDelta: metricAfter.derpTxBytes - metricBefore.derpTxBytes}, + } + + // First pass: check packet counts, tolerating ±1 from stray keepalives. + for i := range stats { + s := &stats[i] + if !s.isPackets { + continue + } + if s.physDelta == s.metDelta { + continue + } + diff := s.physDelta - s.metDelta + if diff < 0 { + diff = -diff + } + if diff <= 1 { + s.packetDeltaTolerated = true + t.Logf("%s: physical delta=%d, metric delta=%d (off by 1, likely background WireGuard keepalive)", s.name, s.physDelta, s.metDelta) + continue + } + t.Errorf("%s: physical delta=%d, metric delta=%d", s.name, s.physDelta, s.metDelta) + } + + // Second pass: check byte counts; tolerate mismatches when the + // corresponding packet count was already tolerated. + for i := range stats { + s := &stats[i] + if s.isPackets { + continue + } + if s.physDelta == s.metDelta { + continue + } + // The preceding entry in the slice is always the corresponding packet stat. + if stats[i-1].packetDeltaTolerated { + t.Logf("%s: physical delta=%d, metric delta=%d (within single-packet tolerance)", s.name, s.physDelta, s.metDelta) + continue + } + t.Errorf("%s: physical delta=%d, metric delta=%d", s.name, s.physDelta, s.metDelta) } - checkPhysVsMetric(physDERPRxBytes, metricDERPRxBytes, "DERPRxBytes") - checkPhysVsMetric(physDERPTxBytes, metricDERPTxBytes, "DERPTxBytes") - checkPhysVsMetric(physIPv4RxBytes, metricIPv4RxBytes, "IPv4RxBytes") - checkPhysVsMetric(physIPv4TxBytes, metricIPv4TxBytes, "IPv4TxBytes") - checkPhysVsMetric(physDERPRxPackets, metricDERPRxPackets, "DERPRxPackets") - checkPhysVsMetric(physDERPTxPackets, metricDERPTxPackets, "DERPTxPackets") - checkPhysVsMetric(physIPv4RxPackets, metricIPv4RxPackets, "IPv4RxPackets") - checkPhysVsMetric(physIPv4TxPackets, metricIPv4TxPackets, "IPv4TxPackets") } // assertGlobalMetricsMatchPerConn validates that the global clientmetric