From e8a476ef5a06b0158ecb8a4cc8157fe49117595b Mon Sep 17 00:00:00 2001 From: Klaus Post Date: Sat, 16 Nov 2024 09:18:48 -0800 Subject: [PATCH] Keep larger merge buffers for RPC (#20654) Keep larger merge buffers When sending large messages >1K, the merge buffer would continuously be reallocated. This could happen on listings, where blocks are typically 4->8K. Keep merge buffer of up to 256KB. Benchmark with 4096b messages: ``` benchmark old ns/op new ns/op delta BenchmarkRequests/servers=2/bytes/par=32-32 8271 6360 -23.10% BenchmarkRequests/servers=2/bytes/par=64-32 7840 4731 -39.66% BenchmarkRequests/servers=2/bytes/par=128-32 7291 4740 -34.99% BenchmarkRequests/servers=2/bytes/par=256-32 7095 4580 -35.45% BenchmarkRequests/servers=2/bytes/par=512-32 6757 4584 -32.16% BenchmarkRequests/servers=2/bytes/par=1024-32 6429 4453 -30.74% benchmark old bytes new bytes delta BenchmarkRequests/servers=2/bytes/par=32-32 12090 821 -93.21% BenchmarkRequests/servers=2/bytes/par=64-32 17423 820 -95.29% BenchmarkRequests/servers=2/bytes/par=128-32 18493 822 -95.56% BenchmarkRequests/servers=2/bytes/par=256-32 18892 821 -95.65% BenchmarkRequests/servers=2/bytes/par=512-32 19064 826 -95.67% BenchmarkRequests/servers=2/bytes/par=1024-32 19038 842 -95.58% ``` --- internal/grid/connection.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 40f84ba46..0cf2e02cc 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -1104,7 +1104,6 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont defer ping.Stop() queue := make([][]byte, 0, maxMergeMessages) - merged := make([]byte, 0, writeBufferSize) var queueSize int var buf bytes.Buffer var wsw wsWriter @@ -1132,7 +1131,7 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont } return false } - if buf.Cap() > writeBufferSize*4 { + if buf.Cap() > writeBufferSize*8 { // Reset buffer if it gets too big, so we don't keep it around. buf = bytes.Buffer{} } @@ -1140,6 +1139,8 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont return true } + // Merge buffer to keep between calls + merged := make([]byte, 0, writeBufferSize) for { var toSend []byte select { @@ -1238,17 +1239,17 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont fmt.Println("Merging", len(queue), "messages") } - toSend = merged[:0] + merged = merged[:0] m := message{Op: OpMerged, Seq: uint32(len(queue))} var err error - toSend, err = m.MarshalMsg(toSend) + merged, err = m.MarshalMsg(merged) if err != nil { gridLogIf(ctx, fmt.Errorf("msg.MarshalMsg: %w", err)) return } // Append as byte slices. for _, q := range queue { - toSend = msgp.AppendBytes(toSend, q) + merged = msgp.AppendBytes(merged, q) PutByteBuffer(q) } queue = queue[:0] @@ -1256,14 +1257,17 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont // Combine writes. // Consider avoiding buffer copy. - err = wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) + err = wsw.writeMessage(&buf, c.side, ws.OpBinary, merged) if err != nil { if !xnet.IsNetworkOrHostDown(err, true) { gridLogIf(ctx, fmt.Errorf("ws writeMessage: %w", err)) } return } - + if cap(merged) > writeBufferSize*8 { + // If we had to send an excessively large package, reset size. + merged = make([]byte, 0, writeBufferSize) + } if !writeBuffer() { return }