diff --git a/cmd/bootstrap-peer-server.go b/cmd/bootstrap-peer-server.go index 8b8efeb34..52dc77298 100644 --- a/cmd/bootstrap-peer-server.go +++ b/cmd/bootstrap-peer-server.go @@ -152,10 +152,12 @@ func (client *bootstrapRESTClient) Verify(ctx context.Context, srcCfg *ServerSys return nil } - recvCfg, err := serverVerifyHandler.Call(ctx, client.gridConn, grid.NewMSSWith(map[string]string{})) + recvCfg, err := serverVerifyHandler.Call(ctx, client.gridConn, grid.NewMSS()) if err != nil { return err } + // We do not need the response after returning. + defer serverVerifyHandler.PutResponse(recvCfg) return srcCfg.Diff(recvCfg) } diff --git a/cmd/peer-rest-server.go b/cmd/peer-rest-server.go index 7be90b4e6..8f30da887 100644 --- a/cmd/peer-rest-server.go +++ b/cmd/peer-rest-server.go @@ -981,7 +981,7 @@ func (s *peerRESTServer) ListenHandler(ctx context.Context, v *grid.URLValues, o logger.LogOnceIf(ctx, err, "event: Encode failed") continue } - out <- grid.NewBytesWith(append(grid.GetByteBuffer()[:0], buf.Bytes()...)) + out <- grid.NewBytesWithCopyOf(buf.Bytes()) } } } diff --git a/cmd/storage-rest-client.go b/cmd/storage-rest-client.go index 8990b5ced..936cb5434 100644 --- a/cmd/storage-rest-client.go +++ b/cmd/storage-rest-client.go @@ -346,6 +346,7 @@ func (client *storageRESTClient) StatVol(ctx context.Context, volume string) (vo return vol, toStorageErr(err) } vol = *v + // Performs shallow copy, so we can reuse. storageStatVolHandler.PutResponse(v) return vol, nil } @@ -455,6 +456,7 @@ func (client *storageRESTClient) RenameData(ctx context.Context, srcVolume, srcP if err != nil { return 0, toStorageErr(err) } + defer storageRenameDataHandler.PutResponse(resp) return resp.Signature, nil } diff --git a/cmd/storage-rest-server.go b/cmd/storage-rest-server.go index 08c1e6414..574818d0d 100644 --- a/cmd/storage-rest-server.go +++ b/cmd/storage-rest-server.go @@ -201,7 +201,7 @@ func (s *storageRESTServer) HealthHandler(w http.ResponseWriter, r *http.Request // DiskInfo types. // DiskInfo.Metrics elements are shared, so we cannot reuse. var storageDiskInfoHandler = grid.NewSingleHandler[*DiskInfoOptions, *DiskInfo](grid.HandlerDiskInfo, func() *DiskInfoOptions { return &DiskInfoOptions{} }, - func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse() + func() *DiskInfo { return &DiskInfo{} }).WithSharedResponse().AllowCallRequestPool(true) // DiskInfoHandler - returns disk info. func (s *storageRESTServer) DiskInfoHandler(opts *DiskInfoOptions) (*DiskInfo, *grid.RemoteErr) { @@ -495,7 +495,7 @@ func (s *storageRESTServer) CheckPartsHandler(p *CheckPartsHandlerParams) (grid. var storageReadAllHandler = grid.NewSingleHandler[*ReadAllHandlerParams, *grid.Bytes](grid.HandlerReadAll, func() *ReadAllHandlerParams { return &ReadAllHandlerParams{} -}, grid.NewBytes) +}, grid.NewBytes).AllowCallRequestPool(true) // ReadAllHandler - read all the contents of a file. func (s *storageRESTServer) ReadAllHandler(p *ReadAllHandlerParams) (*grid.Bytes, *grid.RemoteErr) { @@ -673,7 +673,7 @@ func (s *storageRESTServer) ListDirHandler(w http.ResponseWriter, r *http.Reques var storageDeleteFileHandler = grid.NewSingleHandler[*DeleteFileHandlerParams, grid.NoPayload](grid.HandlerDeleteFile, func() *DeleteFileHandlerParams { return &DeleteFileHandlerParams{} -}, grid.NewNoPayload) +}, grid.NewNoPayload).AllowCallRequestPool(true) // DeleteFileHandler - delete a file. func (s *storageRESTServer) DeleteFileHandler(p *DeleteFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) { @@ -751,7 +751,7 @@ func (s *storageRESTServer) RenameDataHandler(p *RenameDataHandlerParams) (*Rena var storageRenameFileHandler = grid.NewSingleHandler[*RenameFileHandlerParams, grid.NoPayload](grid.HandlerRenameFile, func() *RenameFileHandlerParams { return &RenameFileHandlerParams{} -}, grid.NewNoPayload) +}, grid.NewNoPayload).AllowCallRequestPool(true) // RenameFileHandler - rename a file from source to destination func (s *storageRESTServer) RenameFileHandler(p *RenameFileHandlerParams) (grid.NoPayload, *grid.RemoteErr) { diff --git a/internal/grid/grid_test.go b/internal/grid/grid_test.go index 0ce9afaaa..5c9942c9f 100644 --- a/internal/grid/grid_test.go +++ b/internal/grid/grid_test.go @@ -231,6 +231,7 @@ func TestSingleRoundtripGenerics(t *testing.T) { t.Errorf("want %q, got %q", testPayload, resp.OrgString) } t.Log("Roundtrip:", time.Since(start)) + h1.PutResponse(resp) start = time.Now() resp, err = h2.Call(context.Background(), remoteConn, &testRequest{Num: 1, String: testPayload}) @@ -241,9 +242,74 @@ func TestSingleRoundtripGenerics(t *testing.T) { if resp != nil { t.Errorf("want nil, got %q", resp) } + h2.PutResponse(resp) t.Log("Roundtrip:", time.Since(start)) } +func TestSingleRoundtripGenericsRecycle(t *testing.T) { + defer testlogger.T.SetLogTB(t)() + errFatal := func(err error) { + t.Helper() + if err != nil { + t.Fatal(err) + } + } + grid, err := SetupTestGrid(2) + errFatal(err) + remoteHost := grid.Hosts[1] + local := grid.Managers[0] + remote := grid.Managers[1] + + // 1: Echo + h1 := NewSingleHandler[*MSS, *MSS](handlerTest, NewMSS, NewMSS) + // Handles incoming requests, returns a response + handler1 := func(req *MSS) (resp *MSS, err *RemoteErr) { + resp = h1.NewResponse() + for k, v := range *req { + (*resp)[k] = v + } + return resp, nil + } + // Return error + h2 := NewSingleHandler[*MSS, *MSS](handlerTest2, NewMSS, NewMSS) + handler2 := func(req *MSS) (resp *MSS, err *RemoteErr) { + defer req.Recycle() + r := RemoteErr(req.Get("err")) + return nil, &r + } + errFatal(h1.Register(local, handler1)) + errFatal(h2.Register(local, handler2)) + + errFatal(h1.Register(remote, handler1)) + errFatal(h2.Register(remote, handler2)) + + // local to remote connection + remoteConn := local.Connection(remoteHost) + const testPayload = "Hello Grid World!" + + start := time.Now() + req := NewMSSWith(map[string]string{"test": testPayload}) + resp, err := h1.Call(context.Background(), remoteConn, req) + errFatal(err) + if resp.Get("test") != testPayload { + t.Errorf("want %q, got %q", testPayload, resp.Get("test")) + } + t.Log("Roundtrip:", time.Since(start)) + h1.PutResponse(resp) + + start = time.Now() + resp, err = h2.Call(context.Background(), remoteConn, NewMSSWith(map[string]string{"err": testPayload})) + t.Log("Roundtrip:", time.Since(start)) + if err != RemoteErr(testPayload) { + t.Errorf("want error %v(%T), got %v(%T)", RemoteErr(testPayload), RemoteErr(testPayload), err, err) + } + if resp != nil { + t.Errorf("want nil, got %q", resp) + } + t.Log("Roundtrip:", time.Since(start)) + h2.PutResponse(resp) +} + func TestStreamSuite(t *testing.T) { defer testlogger.T.SetErrorTB(t)() errFatal := func(err error) { diff --git a/internal/grid/handlers.go b/internal/grid/handlers.go index cc026b651..2773d3229 100644 --- a/internal/grid/handlers.go +++ b/internal/grid/handlers.go @@ -335,14 +335,36 @@ type RoundTripper interface { // SingleHandler is a type safe handler for single roundtrip requests. type SingleHandler[Req, Resp RoundTripper] struct { - id HandlerID - sharedResponse bool + id HandlerID + sharedResp bool + callReuseReq bool - reqPool sync.Pool - respPool sync.Pool + newReq func() Req + newResp func() Resp - nilReq Req - nilResp Resp + recycleReq func(Req) + recycleResp func(Resp) +} + +func recycleFunc[RT RoundTripper](newRT func() RT) (newFn func() RT, recycle func(r RT)) { + rAny := any(newRT()) + if rc, ok := rAny.(Recycler); ok { + return newRT, func(r RT) { + rc.Recycle() + } + } + pool := sync.Pool{ + New: func() interface{} { + return newRT() + }, + } + var rZero RT + return func() RT { return pool.Get().(RT) }, + func(r RT) { + if r != rZero { + pool.Put(r) + } + } } // NewSingleHandler creates a typed handler that can provide Marshal/Unmarshal. @@ -350,27 +372,34 @@ type SingleHandler[Req, Resp RoundTripper] struct { // Use Call to initiate a clientside call. func NewSingleHandler[Req, Resp RoundTripper](h HandlerID, newReq func() Req, newResp func() Resp) *SingleHandler[Req, Resp] { s := SingleHandler[Req, Resp]{id: h} - s.reqPool.New = func() interface{} { - return newReq() - } - s.respPool.New = func() interface{} { - return newResp() + s.newReq, s.recycleReq = recycleFunc[Req](newReq) + s.newResp, s.recycleResp = recycleFunc[Resp](newResp) + if _, ok := any(newReq()).(Recycler); ok { + s.callReuseReq = true } return &s } // PutResponse will accept a response for reuse. -// These should be returned by the caller. +// This can be used by a caller to recycle a response after receiving it from a Call. func (h *SingleHandler[Req, Resp]) PutResponse(r Resp) { - if r != h.nilResp { - h.respPool.Put(r) - } + h.recycleResp(r) } -// WithSharedResponse indicates it is unsafe to reuse the response. +// AllowCallRequestPool indicates it is safe to reuse the request +// on the client side, meaning the request is recycled/pooled when a request is sent. +// CAREFUL: This should only be used when there are no pointers, slices that aren't freshly constructed. +func (h *SingleHandler[Req, Resp]) AllowCallRequestPool(b bool) *SingleHandler[Req, Resp] { + h.callReuseReq = b + return h +} + +// WithSharedResponse indicates it is unsafe to reuse the response +// when it has been returned on a handler. +// This will disable automatic response recycling/pooling. // Typically this is used when the response sharing part of its data structure. func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp] { - h.sharedResponse = true + h.sharedResp = true return h } @@ -378,26 +407,25 @@ func (h *SingleHandler[Req, Resp]) WithSharedResponse() *SingleHandler[Req, Resp // Handlers can use this to create a reusable response. // The response may be reused, so caller should clear any fields. func (h *SingleHandler[Req, Resp]) NewResponse() Resp { - return h.respPool.Get().(Resp) -} - -// putRequest will accept a request for reuse. -// This is not exported, since it shouldn't be needed. -func (h *SingleHandler[Req, Resp]) putRequest(r Req) { - if r != h.nilReq { - h.reqPool.Put(r) - } + return h.newResp() } // NewRequest creates a new request. // Handlers can use this to create a reusable request. // The request may be reused, so caller should clear any fields. func (h *SingleHandler[Req, Resp]) NewRequest() Req { - return h.reqPool.Get().(Req) + return h.newReq() } // Register a handler for a Req -> Resp roundtrip. +// Requests are automatically recycled. func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (resp Resp, err *RemoteErr), subroute ...string) error { + if h.newReq == nil { + return errors.New("newReq nil in NewSingleHandler") + } + if h.newResp == nil { + return errors.New("newResp nil in NewSingleHandler") + } return m.RegisterSingleHandler(h.id, func(payload []byte) ([]byte, *RemoteErr) { req := h.NewRequest() _, err := req.UnmarshalMsg(payload) @@ -407,13 +435,13 @@ func (h *SingleHandler[Req, Resp]) Register(m *Manager, handle func(req Req) (re return nil, &r } resp, rerr := handle(req) - h.putRequest(req) + h.recycleReq(req) if rerr != nil { PutByteBuffer(payload) return nil, rerr } payload, err = resp.MarshalMsg(payload[:0]) - if !h.sharedResponse { + if !h.sharedResp { h.PutResponse(resp) } if err != nil { @@ -438,7 +466,18 @@ func (h *SingleHandler[Req, Resp]) Call(ctx context.Context, c Requester, req Re if err != nil { return resp, err } - ctx = context.WithValue(ctx, TraceParamsKey{}, req) + switch any(req).(type) { + case *MSS, *Bytes, *URLValues: + ctx = context.WithValue(ctx, TraceParamsKey{}, req) + case *NoPayload: + default: + ctx = context.WithValue(ctx, TraceParamsKey{}, fmt.Sprintf("type=%T", req)) + } + if h.callReuseReq { + defer func() { + h.recycleReq(req) + }() + } res, err := c.Request(ctx, h.id, payload) PutByteBuffer(payload) if err != nil { @@ -764,26 +803,3 @@ func (h *StreamTypeHandler[Payload, Req, Resp]) Call(ctx context.Context, c Stre return &TypedStream[Req, Resp]{responses: stream, newResp: h.NewResponse, Requests: reqT}, nil } - -// NoPayload is a type that can be used for handlers that do not use a payload. -type NoPayload struct{} - -// Msgsize returns 0. -func (p NoPayload) Msgsize() int { - return 0 -} - -// UnmarshalMsg satisfies the interface, but is a no-op. -func (NoPayload) UnmarshalMsg(bytes []byte) ([]byte, error) { - return bytes, nil -} - -// MarshalMsg satisfies the interface, but is a no-op. -func (NoPayload) MarshalMsg(bytes []byte) ([]byte, error) { - return bytes, nil -} - -// NewNoPayload returns an empty NoPayload struct. -func NewNoPayload() NoPayload { - return NoPayload{} -} diff --git a/internal/grid/trace.go b/internal/grid/trace.go index 2fe65644d..800c23133 100644 --- a/internal/grid/trace.go +++ b/internal/grid/trace.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "net/http" - "net/url" "strings" "time" @@ -131,22 +130,26 @@ func (c *muxClient) traceRoundtrip(ctx context.Context, t *tracer, h HandlerID, } // If the context contains a TraceParamsKey, add it to the trace path. v := ctx.Value(TraceParamsKey{}) - if p, ok := v.(*MSS); ok && p != nil { - trace.Path += p.ToQuery() - trace.HTTP.ReqInfo.Path = trace.Path - } else if p, ok := v.(map[string]string); ok { - m := MSS(p) + // Should match SingleHandler.Call checks. + switch typed := v.(type) { + case *MSS: + trace.Path += typed.ToQuery() + case map[string]string: + m := MSS(typed) trace.Path += m.ToQuery() - trace.HTTP.ReqInfo.Path = trace.Path - } else if v != nil { - // Print exported fields as single request to path. - obj := fmt.Sprintf("%+v", v) - if len(obj) > 1024 { - obj = obj[:1024] + "..." + case *URLValues: + trace.Path += typed.Values().Encode() + case *NoPayload: + case *Bytes: + if typed != nil { + trace.Path = fmt.Sprintf("%s?bytes=%d", trace.Path, len(*typed)) } - trace.Path = fmt.Sprintf("%s?req=%s", trace.Path, url.QueryEscape(obj)) - trace.HTTP.ReqInfo.Path = trace.Path + case string: + trace.Path = fmt.Sprintf("%s?%s", trace.Path, typed) + default: } + trace.HTTP.ReqInfo.Path = trace.Path + t.Publisher.Publish(trace) return resp, err } diff --git a/internal/grid/types.go b/internal/grid/types.go index 8d900115d..e29387b2c 100644 --- a/internal/grid/types.go +++ b/internal/grid/types.go @@ -27,6 +27,14 @@ import ( "github.com/tinylib/msgp/msgp" ) +// Recycler will override the internal reuse in typed handlers. +// When this is supported, the handler will not do internal pooling of objects, +// call Recycle() when the object is no longer needed. +// The recycler should handle nil pointers. +type Recycler interface { + Recycle() +} + // MSS is a map[string]string that can be serialized. // It is not very efficient, but it is only used for easy parameter passing. type MSS map[string]string @@ -39,6 +47,14 @@ func (m *MSS) Get(key string) string { return (*m)[key] } +// Set a key, value pair. +func (m *MSS) Set(key, value string) { + if m == nil { + *m = mssPool.Get().(map[string]string) + } + (*m)[key] = value +} + // UnmarshalMsg deserializes m from the provided byte slice and returns the // remainder of bytes. func (m *MSS) UnmarshalMsg(bts []byte) (o []byte, err error) { @@ -111,7 +127,10 @@ func (m *MSS) Msgsize() int { // NewMSS returns a new MSS. func NewMSS() *MSS { - m := MSS(make(map[string]string)) + m := MSS(mssPool.Get().(map[string]string)) + for k := range m { + delete(m, k) + } return &m } @@ -121,6 +140,20 @@ func NewMSSWith(m map[string]string) *MSS { return &m2 } +var mssPool = sync.Pool{ + New: func() interface{} { + return make(map[string]string, 5) + }, +} + +// Recycle the underlying map. +func (m *MSS) Recycle() { + if m != nil && *m != nil { + mssPool.Put(map[string]string(*m)) + *m = nil + } +} + // ToQuery constructs a URL query string from the MSS, including "?" if there are any keys. func (m MSS) ToQuery() string { if len(m) == 0 { @@ -147,17 +180,36 @@ func (m MSS) ToQuery() string { } // NewBytes returns a new Bytes. +// A slice is preallocated. func NewBytes() *Bytes { b := Bytes(GetByteBuffer()[:0]) return &b } // NewBytesWith returns a new Bytes with the provided content. +// When sent as a parameter, the caller gives up ownership of the byte slice. +// When returned as response, the handler also gives up ownership of the byte slice. func NewBytesWith(b []byte) *Bytes { bb := Bytes(b) return &bb } +// NewBytesWithCopyOf returns a new byte slice with a copy of the provided content. +func NewBytesWithCopyOf(b []byte) *Bytes { + if b == nil { + bb := Bytes(nil) + return &bb + } + if len(b) < maxBufferSize { + bb := NewBytes() + *bb = append(*bb, b...) + return bb + } + bb := Bytes(make([]byte, len(b))) + copy(bb, b) + return &bb +} + // Bytes provides a byte slice that can be serialized. type Bytes []byte @@ -168,6 +220,9 @@ func (b *Bytes) UnmarshalMsg(bytes []byte) ([]byte, error) { return bytes, errors.New("Bytes: UnmarshalMsg on nil pointer") } if bytes, err := msgp.ReadNilBytes(bytes); err == nil { + if *b != nil { + PutByteBuffer(*b) + } *b = nil return bytes, nil } @@ -179,7 +234,16 @@ func (b *Bytes) UnmarshalMsg(bytes []byte) ([]byte, error) { *b = (*b)[:len(val)] copy(*b, val) } else { - *b = append(make([]byte, 0, len(val)), val...) + if cap(*b) == 0 && len(val) <= maxBufferSize { + PutByteBuffer(*b) + *b = GetByteBuffer()[:0] + } else { + PutByteBuffer(*b) + *b = make([]byte, 0, len(val)) + } + in := *b + in = append(in[:0], val...) + *b = in } return bytes, nil } @@ -202,7 +266,8 @@ func (b *Bytes) Msgsize() int { // Recycle puts the Bytes back into the pool. func (b *Bytes) Recycle() { - if *b != nil { + if b != nil && *b != nil { + *b = (*b)[:0] PutByteBuffer(*b) *b = nil } @@ -329,3 +394,29 @@ func (u URLValues) Msgsize() (s int) { } return } + +// NoPayload is a type that can be used for handlers that do not use a payload. +type NoPayload struct{} + +// Msgsize returns 0. +func (p NoPayload) Msgsize() int { + return 0 +} + +// UnmarshalMsg satisfies the interface, but is a no-op. +func (NoPayload) UnmarshalMsg(bytes []byte) ([]byte, error) { + return bytes, nil +} + +// MarshalMsg satisfies the interface, but is a no-op. +func (NoPayload) MarshalMsg(bytes []byte) ([]byte, error) { + return bytes, nil +} + +// NewNoPayload returns an empty NoPayload struct. +func NewNoPayload() NoPayload { + return NoPayload{} +} + +// Recycle is a no-op. +func (NoPayload) Recycle() {}