diff --git a/internal/grid/connection.go b/internal/grid/connection.go index 4d5b45d4f..508126e56 100644 --- a/internal/grid/connection.go +++ b/internal/grid/connection.go @@ -27,6 +27,7 @@ import ( "math" "math/rand" "net" + "runtime" "runtime/debug" "strings" "sync" @@ -980,7 +981,10 @@ func (c *Connection) readStream(ctx context.Context, conn net.Conn, cancel conte if int64(cap(dst)) < hdr.Length+1 { dst = make([]byte, 0, hdr.Length+hdr.Length>>3) } - return readAllInto(dst[:0], &wsReader) + if !hdr.Fin { + hdr.Length = -1 + } + return readAllInto(dst[:0], &wsReader, hdr.Length) } } @@ -1125,10 +1129,16 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont continue } } - if len(queue) < maxMergeMessages && queueSize+len(toSend) < writeBufferSize-1024 && len(c.outQueue) > 0 { - queue = append(queue, toSend) - queueSize += len(toSend) - continue + if len(queue) < maxMergeMessages && queueSize+len(toSend) < writeBufferSize-1024 { + if len(c.outQueue) == 0 { + // Yield to allow more messages to fill. + runtime.Gosched() + } + if len(c.outQueue) > 0 { + queue = append(queue, toSend) + queueSize += len(toSend) + continue + } } c.outMessages.Add(int64(len(queue) + 1)) if c.outgoingBytes != nil { @@ -1158,7 +1168,7 @@ func (c *Connection) writeStream(ctx context.Context, conn net.Conn, cancel cont } c.connChange.L.Unlock() if len(queue) == 0 { - // Combine writes. + // Send single message without merging. buf.Reset() err := wsw.writeMessage(&buf, c.side, ws.OpBinary, toSend) if err != nil { diff --git a/internal/grid/grid.go b/internal/grid/grid.go index 6baf7771c..51bc57afd 100644 --- a/internal/grid/grid.go +++ b/internal/grid/grid.go @@ -57,7 +57,7 @@ const ( biggerBufMax = maxBufferSize // If there is a queue, merge up to this many messages. - maxMergeMessages = 30 + maxMergeMessages = 50 // clientPingInterval will ping the remote handler every 15 seconds. // Clients disconnect when we exceed 2 intervals. @@ -126,7 +126,8 @@ var PutByteBuffer = func(b []byte) { // A successful call returns err == nil, not err == EOF. Because readAllInto is // defined to read from src until EOF, it does not treat an EOF from Read // as an error to be reported. -func readAllInto(b []byte, r *wsutil.Reader) ([]byte, error) { +func readAllInto(b []byte, r *wsutil.Reader, want int64) ([]byte, error) { + read := int64(0) for { if len(b) == cap(b) { // Add more capacity (let append pick how much). @@ -136,10 +137,18 @@ func readAllInto(b []byte, r *wsutil.Reader) ([]byte, error) { b = b[:len(b)+n] if err != nil { if errors.Is(err, io.EOF) { + if want >= 0 && read+int64(n) != want { + return nil, io.ErrUnexpectedEOF + } err = nil } return b, err } + read += int64(n) + if want >= 0 && read == want { + // No need to read more... + return b, nil + } } }