mirror of
				https://github.com/minio/minio.git
				synced 2025-10-31 00:01:27 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			663 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			663 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright (c) 2015-2023 MinIO, Inc.
 | |
| //
 | |
| // This file is part of MinIO Object Storage stack
 | |
| //
 | |
| // This program is free software: you can redistribute it and/or modify
 | |
| // it under the terms of the GNU Affero General Public License as published by
 | |
| // the Free Software Foundation, either version 3 of the License, or
 | |
| // (at your option) any later version.
 | |
| //
 | |
| // This program is distributed in the hope that it will be useful
 | |
| // but WITHOUT ANY WARRANTY; without even the implied warranty of
 | |
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | |
| // GNU Affero General Public License for more details.
 | |
| //
 | |
| // You should have received a copy of the GNU Affero General Public License
 | |
| // along with this program.  If not, see <http://www.gnu.org/licenses/>.
 | |
| 
 | |
| package grid
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/binary"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	xioutil "github.com/minio/minio/internal/ioutil"
 | |
| 	"github.com/zeebo/xxh3"
 | |
| )
 | |
| 
 | |
| // muxClient is a stateful connection to a remote.
 | |
| type muxClient struct {
 | |
| 	MuxID              uint64
 | |
| 	SendSeq, RecvSeq   uint32
 | |
| 	LastPong           int64
 | |
| 	BaseFlags          Flags
 | |
| 	ctx                context.Context
 | |
| 	cancelFn           context.CancelCauseFunc
 | |
| 	parent             *Connection
 | |
| 	respWait           chan<- Response
 | |
| 	respMu             sync.Mutex
 | |
| 	singleResp         bool
 | |
| 	closed             bool
 | |
| 	stateless          bool
 | |
| 	acked              bool
 | |
| 	init               bool
 | |
| 	deadline           time.Duration
 | |
| 	outBlock           chan struct{}
 | |
| 	subroute           *subHandlerID
 | |
| 	respErr            atomic.Pointer[error]
 | |
| 	clientPingInterval time.Duration
 | |
| }
 | |
| 
 | |
| // Response is a response from the server.
 | |
| type Response struct {
 | |
| 	Msg []byte
 | |
| 	Err error
 | |
| }
 | |
| 
 | |
| func newMuxClient(ctx context.Context, muxID uint64, parent *Connection) *muxClient {
 | |
| 	ctx, cancelFn := context.WithCancelCause(ctx)
 | |
| 	return &muxClient{
 | |
| 		MuxID:              muxID,
 | |
| 		ctx:                ctx,
 | |
| 		cancelFn:           cancelFn,
 | |
| 		parent:             parent,
 | |
| 		LastPong:           time.Now().UnixNano(),
 | |
| 		BaseFlags:          parent.baseFlags,
 | |
| 		clientPingInterval: parent.clientPingInterval,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // roundtrip performs a roundtrip, returning the first response.
 | |
| // This cannot be used concurrently.
 | |
| func (m *muxClient) roundtrip(h HandlerID, req []byte) ([]byte, error) {
 | |
| 	if m.init {
 | |
| 		return nil, errors.New("mux client already used")
 | |
| 	}
 | |
| 	m.init = true
 | |
| 	m.singleResp = true
 | |
| 	msg := message{
 | |
| 		Op:         OpRequest,
 | |
| 		MuxID:      m.MuxID,
 | |
| 		Handler:    h,
 | |
| 		Flags:      m.BaseFlags | FlagEOF,
 | |
| 		Payload:    req,
 | |
| 		DeadlineMS: uint32(m.deadline.Milliseconds()),
 | |
| 	}
 | |
| 	if m.subroute != nil {
 | |
| 		msg.Flags |= FlagSubroute
 | |
| 	}
 | |
| 	ch := make(chan Response, 1)
 | |
| 	m.respMu.Lock()
 | |
| 	if m.closed {
 | |
| 		m.respMu.Unlock()
 | |
| 		return nil, ErrDisconnected
 | |
| 	}
 | |
| 	m.respWait = ch
 | |
| 	m.respMu.Unlock()
 | |
| 	ctx := m.ctx
 | |
| 
 | |
| 	// Add deadline if none.
 | |
| 	if msg.DeadlineMS == 0 {
 | |
| 		msg.DeadlineMS = uint32(defaultSingleRequestTimeout / time.Millisecond)
 | |
| 		var cancel context.CancelFunc
 | |
| 		ctx, cancel = context.WithTimeout(ctx, defaultSingleRequestTimeout)
 | |
| 		defer cancel()
 | |
| 	}
 | |
| 	// Send request
 | |
| 	if err := m.send(msg); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if debugReqs {
 | |
| 		fmt.Println(m.MuxID, m.parent.String(), "SEND")
 | |
| 	}
 | |
| 	// Wait for response or context.
 | |
| 	select {
 | |
| 	case v, ok := <-ch:
 | |
| 		if !ok {
 | |
| 			return nil, ErrDisconnected
 | |
| 		}
 | |
| 		if debugReqs && v.Err != nil {
 | |
| 			v.Err = fmt.Errorf("%d %s RESP ERR: %w", m.MuxID, m.parent.String(), v.Err)
 | |
| 		}
 | |
| 		return v.Msg, v.Err
 | |
| 	case <-ctx.Done():
 | |
| 		if debugReqs {
 | |
| 			return nil, fmt.Errorf("%d %s ERR: %w", m.MuxID, m.parent.String(), context.Cause(ctx))
 | |
| 		}
 | |
| 		return nil, context.Cause(ctx)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // send the message. msg.Seq and msg.MuxID will be set
 | |
| func (m *muxClient) send(msg message) error {
 | |
| 	m.respMu.Lock()
 | |
| 	defer m.respMu.Unlock()
 | |
| 	if m.closed {
 | |
| 		return errors.New("mux client closed")
 | |
| 	}
 | |
| 	return m.sendLocked(msg)
 | |
| }
 | |
| 
 | |
| // sendLocked the message. msg.Seq and msg.MuxID will be set.
 | |
| // m.respMu must be held.
 | |
| func (m *muxClient) sendLocked(msg message) error {
 | |
| 	dst := GetByteBufferCap(msg.Msgsize())
 | |
| 	msg.Seq = m.SendSeq
 | |
| 	msg.MuxID = m.MuxID
 | |
| 	msg.Flags |= m.BaseFlags
 | |
| 	if debugPrint {
 | |
| 		fmt.Println("Client sending", &msg, "to", m.parent.Remote)
 | |
| 	}
 | |
| 	m.SendSeq++
 | |
| 
 | |
| 	dst, err := msg.MarshalMsg(dst)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	if msg.Flags&FlagSubroute != 0 {
 | |
| 		if m.subroute == nil {
 | |
| 			return fmt.Errorf("internal error: subroute not defined on client")
 | |
| 		}
 | |
| 		hid := m.subroute.withHandler(msg.Handler)
 | |
| 		before := len(dst)
 | |
| 		dst = append(dst, hid[:]...)
 | |
| 		if debugPrint {
 | |
| 			fmt.Println("Added subroute", hid.String(), "to message", msg, "len", len(dst)-before)
 | |
| 		}
 | |
| 	}
 | |
| 	if msg.Flags&FlagCRCxxh3 != 0 {
 | |
| 		h := xxh3.Hash(dst)
 | |
| 		dst = binary.LittleEndian.AppendUint32(dst, uint32(h))
 | |
| 	}
 | |
| 	return m.parent.send(m.ctx, dst)
 | |
| }
 | |
| 
 | |
| // RequestStateless will send a single payload request and stream back results.
 | |
| // req may not be read/written to after calling.
 | |
| // TODO: Not implemented
 | |
| func (m *muxClient) RequestStateless(h HandlerID, req []byte, out chan<- Response) {
 | |
| 	if m.init {
 | |
| 		out <- Response{Err: errors.New("mux client already used")}
 | |
| 	}
 | |
| 	m.init = true
 | |
| 
 | |
| 	// Try to grab an initial block.
 | |
| 	m.singleResp = false
 | |
| 	msg := message{
 | |
| 		Op:         OpConnectMux,
 | |
| 		Handler:    h,
 | |
| 		Flags:      FlagEOF,
 | |
| 		Payload:    req,
 | |
| 		DeadlineMS: uint32(m.deadline.Milliseconds()),
 | |
| 	}
 | |
| 	msg.setZeroPayloadFlag()
 | |
| 	if m.subroute != nil {
 | |
| 		msg.Flags |= FlagSubroute
 | |
| 	}
 | |
| 
 | |
| 	// Send...
 | |
| 	err := m.send(msg)
 | |
| 	if err != nil {
 | |
| 		out <- Response{Err: err}
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// Route directly to output.
 | |
| 	m.respWait = out
 | |
| }
 | |
| 
 | |
| // RequestStream will send a single payload request and stream back results.
 | |
| // 'requests' can be nil, in which case only req is sent as input.
 | |
| // It will however take less resources.
 | |
| func (m *muxClient) RequestStream(h HandlerID, payload []byte, requests chan []byte, responses chan Response) (*Stream, error) {
 | |
| 	if m.init {
 | |
| 		return nil, errors.New("mux client already used")
 | |
| 	}
 | |
| 	if responses == nil {
 | |
| 		return nil, errors.New("RequestStream: responses channel is nil")
 | |
| 	}
 | |
| 	m.init = true
 | |
| 	m.respMu.Lock()
 | |
| 	if m.closed {
 | |
| 		m.respMu.Unlock()
 | |
| 		return nil, ErrDisconnected
 | |
| 	}
 | |
| 	m.respWait = responses // Route directly to output.
 | |
| 	m.respMu.Unlock()
 | |
| 
 | |
| 	// Try to grab an initial block.
 | |
| 	m.singleResp = false
 | |
| 	m.RecvSeq = m.SendSeq // Sync
 | |
| 	if cap(requests) > 0 {
 | |
| 		m.outBlock = make(chan struct{}, cap(requests))
 | |
| 	}
 | |
| 	msg := message{
 | |
| 		Op:         OpConnectMux,
 | |
| 		Handler:    h,
 | |
| 		Payload:    payload,
 | |
| 		DeadlineMS: uint32(m.deadline.Milliseconds()),
 | |
| 	}
 | |
| 	msg.setZeroPayloadFlag()
 | |
| 	if requests == nil {
 | |
| 		msg.Flags |= FlagEOF
 | |
| 	}
 | |
| 	if m.subroute != nil {
 | |
| 		msg.Flags |= FlagSubroute
 | |
| 	}
 | |
| 
 | |
| 	// Send...
 | |
| 	err := m.send(msg)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if debugPrint {
 | |
| 		fmt.Println("Connecting Mux", m.MuxID, ",to", m.parent.Remote)
 | |
| 	}
 | |
| 
 | |
| 	// Space for one message and an error.
 | |
| 	responseCh := make(chan Response, 1)
 | |
| 
 | |
| 	// Spawn simple disconnect
 | |
| 	if requests == nil {
 | |
| 		go m.handleOneWayStream(responseCh, responses)
 | |
| 		return &Stream{responses: responseCh, Requests: nil, ctx: m.ctx, cancel: m.cancelFn, muxID: m.MuxID}, nil
 | |
| 	}
 | |
| 
 | |
| 	// Deliver responses and send unblocks back to the server.
 | |
| 	go m.handleTwowayResponses(responseCh, responses)
 | |
| 	go m.handleTwowayRequests(responses, requests)
 | |
| 
 | |
| 	return &Stream{responses: responseCh, Requests: requests, ctx: m.ctx, cancel: m.cancelFn, muxID: m.MuxID}, nil
 | |
| }
 | |
| 
 | |
| func (m *muxClient) addErrorNonBlockingClose(respHandler chan<- Response, err error) {
 | |
| 	m.respMu.Lock()
 | |
| 	defer m.respMu.Unlock()
 | |
| 	if !m.closed {
 | |
| 		m.respErr.Store(&err)
 | |
| 		// Do not block.
 | |
| 		select {
 | |
| 		case respHandler <- Response{Err: err}:
 | |
| 			xioutil.SafeClose(respHandler)
 | |
| 		default:
 | |
| 			go func() {
 | |
| 				respHandler <- Response{Err: err}
 | |
| 				xioutil.SafeClose(respHandler)
 | |
| 			}()
 | |
| 		}
 | |
| 		gridLogIf(m.ctx, m.sendLocked(message{Op: OpDisconnectServerMux, MuxID: m.MuxID}))
 | |
| 		m.closed = true
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // respHandler
 | |
| func (m *muxClient) handleOneWayStream(respHandler chan<- Response, respServer <-chan Response) {
 | |
| 	if debugPrint {
 | |
| 		start := time.Now()
 | |
| 		defer func() {
 | |
| 			fmt.Println("Mux", m.MuxID, "Request took", time.Since(start).Round(time.Millisecond))
 | |
| 		}()
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		// addErrorNonBlockingClose will close the response channel
 | |
| 		// - maybe async, so we shouldn't do it here.
 | |
| 		if m.respErr.Load() == nil {
 | |
| 			xioutil.SafeClose(respHandler)
 | |
| 		}
 | |
| 	}()
 | |
| 	var pingTimer <-chan time.Time
 | |
| 	if m.deadline == 0 || m.deadline > m.clientPingInterval {
 | |
| 		ticker := time.NewTicker(m.clientPingInterval)
 | |
| 		defer ticker.Stop()
 | |
| 		pingTimer = ticker.C
 | |
| 		atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
 | |
| 	}
 | |
| 	defer m.parent.deleteMux(false, m.MuxID)
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-m.ctx.Done():
 | |
| 			if debugPrint {
 | |
| 				fmt.Println("Client sending disconnect to mux", m.MuxID)
 | |
| 			}
 | |
| 			err := context.Cause(m.ctx)
 | |
| 			if !errors.Is(err, errStreamEOF) {
 | |
| 				m.addErrorNonBlockingClose(respHandler, err)
 | |
| 			}
 | |
| 			return
 | |
| 		case resp, ok := <-respServer:
 | |
| 			if !ok {
 | |
| 				return
 | |
| 			}
 | |
| 		sendResp:
 | |
| 			select {
 | |
| 			case respHandler <- resp:
 | |
| 				m.respMu.Lock()
 | |
| 				if !m.closed {
 | |
| 					gridLogIf(m.ctx, m.sendLocked(message{Op: OpUnblockSrvMux, MuxID: m.MuxID}))
 | |
| 				}
 | |
| 				m.respMu.Unlock()
 | |
| 			case <-m.ctx.Done():
 | |
| 				// Client canceled. Don't block.
 | |
| 				// Next loop will catch it.
 | |
| 			case <-pingTimer:
 | |
| 				if !m.doPing(respHandler) {
 | |
| 					return
 | |
| 				}
 | |
| 				goto sendResp
 | |
| 			}
 | |
| 		case <-pingTimer:
 | |
| 			if !m.doPing(respHandler) {
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // doPing checks last ping time and sends another ping.
 | |
| func (m *muxClient) doPing(respHandler chan<- Response) (ok bool) {
 | |
| 	m.respMu.Lock()
 | |
| 	if m.closed {
 | |
| 		m.respMu.Unlock()
 | |
| 		// Already closed. This is not an error state;
 | |
| 		// we may just be delivering the last responses.
 | |
| 		return true
 | |
| 	}
 | |
| 
 | |
| 	// Only check ping when not closed.
 | |
| 	if got := time.Since(time.Unix(0, atomic.LoadInt64(&m.LastPong))); got > m.clientPingInterval*2 {
 | |
| 		m.respMu.Unlock()
 | |
| 		if debugPrint {
 | |
| 			fmt.Printf("Mux %d: last pong %v ago, disconnecting\n", m.MuxID, got)
 | |
| 		}
 | |
| 		m.addErrorNonBlockingClose(respHandler, ErrDisconnected)
 | |
| 		return false
 | |
| 	}
 | |
| 
 | |
| 	// Send new ping
 | |
| 	err := m.sendLocked(message{Op: OpPing, MuxID: m.MuxID})
 | |
| 	m.respMu.Unlock()
 | |
| 	if err != nil {
 | |
| 		m.addErrorNonBlockingClose(respHandler, err)
 | |
| 	}
 | |
| 	return err == nil
 | |
| }
 | |
| 
 | |
| // responseCh is the channel to that goes to the requester.
 | |
| // internalResp is the channel that comes from the server.
 | |
| func (m *muxClient) handleTwowayResponses(responseCh chan<- Response, internalResp <-chan Response) {
 | |
| 	defer func() {
 | |
| 		m.parent.deleteMux(false, m.MuxID)
 | |
| 		// addErrorNonBlockingClose will close the response channel.
 | |
| 		xioutil.SafeClose(responseCh)
 | |
| 	}()
 | |
| 
 | |
| 	// Cancelation and errors are handled by handleTwowayRequests below.
 | |
| 	for resp := range internalResp {
 | |
| 		m.send(message{Op: OpUnblockSrvMux, MuxID: m.MuxID})
 | |
| 		responseCh <- resp
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *muxClient) handleTwowayRequests(errResp chan<- Response, requests <-chan []byte) {
 | |
| 	var errState bool
 | |
| 	if debugPrint {
 | |
| 		start := time.Now()
 | |
| 		defer func() {
 | |
| 			fmt.Println("Mux", m.MuxID, "Request took", time.Since(start).Round(time.Millisecond))
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	var pingTimer <-chan time.Time
 | |
| 	if m.deadline == 0 || m.deadline > m.clientPingInterval {
 | |
| 		ticker := time.NewTicker(m.clientPingInterval)
 | |
| 		defer ticker.Stop()
 | |
| 		pingTimer = ticker.C
 | |
| 		atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
 | |
| 	}
 | |
| 
 | |
| 	// Listen for client messages.
 | |
| reqLoop:
 | |
| 	for !errState {
 | |
| 		select {
 | |
| 		case <-m.ctx.Done():
 | |
| 			if debugPrint {
 | |
| 				fmt.Println("Client sending disconnect to mux", m.MuxID)
 | |
| 			}
 | |
| 			m.addErrorNonBlockingClose(errResp, context.Cause(m.ctx))
 | |
| 			errState = true
 | |
| 			continue
 | |
| 		case <-pingTimer:
 | |
| 			if !m.doPing(errResp) {
 | |
| 				errState = true
 | |
| 				continue
 | |
| 			}
 | |
| 		case req, ok := <-requests:
 | |
| 			if !ok {
 | |
| 				// Done send EOF
 | |
| 				if debugPrint {
 | |
| 					fmt.Println("Client done, sending EOF to mux", m.MuxID)
 | |
| 				}
 | |
| 				msg := message{
 | |
| 					Op:    OpMuxClientMsg,
 | |
| 					MuxID: m.MuxID,
 | |
| 					Flags: FlagEOF,
 | |
| 				}
 | |
| 				msg.setZeroPayloadFlag()
 | |
| 				err := m.send(msg)
 | |
| 				if err != nil {
 | |
| 					m.addErrorNonBlockingClose(errResp, err)
 | |
| 				}
 | |
| 				break reqLoop
 | |
| 			}
 | |
| 			// Grab a send token.
 | |
| 		sendReq:
 | |
| 			select {
 | |
| 			case <-m.ctx.Done():
 | |
| 				m.addErrorNonBlockingClose(errResp, context.Cause(m.ctx))
 | |
| 				errState = true
 | |
| 				continue
 | |
| 			case <-pingTimer:
 | |
| 				if !m.doPing(errResp) {
 | |
| 					errState = true
 | |
| 					continue
 | |
| 				}
 | |
| 				goto sendReq
 | |
| 			case <-m.outBlock:
 | |
| 			}
 | |
| 			msg := message{
 | |
| 				Op:      OpMuxClientMsg,
 | |
| 				MuxID:   m.MuxID,
 | |
| 				Seq:     1,
 | |
| 				Payload: req,
 | |
| 			}
 | |
| 			msg.setZeroPayloadFlag()
 | |
| 			err := m.send(msg)
 | |
| 			PutByteBuffer(req)
 | |
| 			if err != nil {
 | |
| 				m.addErrorNonBlockingClose(errResp, err)
 | |
| 				errState = true
 | |
| 				continue
 | |
| 			}
 | |
| 			msg.Seq++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if errState {
 | |
| 		// Drain requests.
 | |
| 		for {
 | |
| 			select {
 | |
| 			case r, ok := <-requests:
 | |
| 				if !ok {
 | |
| 					return
 | |
| 				}
 | |
| 				PutByteBuffer(r)
 | |
| 			default:
 | |
| 				return
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	for !errState {
 | |
| 		select {
 | |
| 		case <-m.ctx.Done():
 | |
| 			if debugPrint {
 | |
| 				fmt.Println("Client sending disconnect to mux", m.MuxID)
 | |
| 			}
 | |
| 			m.addErrorNonBlockingClose(errResp, context.Cause(m.ctx))
 | |
| 			return
 | |
| 		case <-pingTimer:
 | |
| 			errState = !m.doPing(errResp)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // checkSeq will check if sequence number is correct and increment it by 1.
 | |
| func (m *muxClient) checkSeq(seq uint32) (ok bool) {
 | |
| 	if seq != m.RecvSeq {
 | |
| 		if debugPrint {
 | |
| 			fmt.Printf("MuxID: %d client, expected sequence %d, got %d\n", m.MuxID, m.RecvSeq, seq)
 | |
| 		}
 | |
| 		m.addResponse(Response{Err: ErrIncorrectSequence})
 | |
| 		return false
 | |
| 	}
 | |
| 	m.RecvSeq++
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| // response will send handleIncoming response to client.
 | |
| // may never block.
 | |
| // Should return whether the next call would block.
 | |
| func (m *muxClient) response(seq uint32, r Response) {
 | |
| 	if debugReqs {
 | |
| 		fmt.Println(m.MuxID, m.parent.String(), "RESP")
 | |
| 	}
 | |
| 	if debugPrint {
 | |
| 		fmt.Printf("mux %d: got msg seqid %d, payload length: %d, err:%v\n", m.MuxID, seq, len(r.Msg), r.Err)
 | |
| 	}
 | |
| 	if !m.checkSeq(seq) {
 | |
| 		if debugReqs {
 | |
| 			fmt.Println(m.MuxID, m.parent.String(), "CHECKSEQ FAIL", m.RecvSeq, seq)
 | |
| 		}
 | |
| 		PutByteBuffer(r.Msg)
 | |
| 		r.Msg = nil
 | |
| 		r.Err = ErrIncorrectSequence
 | |
| 		m.addResponse(r)
 | |
| 		return
 | |
| 	}
 | |
| 	atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
 | |
| 	ok := m.addResponse(r)
 | |
| 	if !ok {
 | |
| 		PutByteBuffer(r.Msg)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var errStreamEOF = errors.New("stream EOF")
 | |
| 
 | |
| // error is a message from the server to disconnect.
 | |
| func (m *muxClient) error(err RemoteErr) {
 | |
| 	if debugPrint {
 | |
| 		fmt.Printf("mux %d: got remote err:%v\n", m.MuxID, string(err))
 | |
| 	}
 | |
| 	m.addResponse(Response{Err: &err})
 | |
| }
 | |
| 
 | |
| func (m *muxClient) ack(seq uint32) {
 | |
| 	if !m.checkSeq(seq) {
 | |
| 		return
 | |
| 	}
 | |
| 	if m.acked || m.outBlock == nil {
 | |
| 		return
 | |
| 	}
 | |
| 	available := cap(m.outBlock)
 | |
| 	for i := 0; i < available; i++ {
 | |
| 		m.outBlock <- struct{}{}
 | |
| 	}
 | |
| 	m.acked = true
 | |
| }
 | |
| 
 | |
| func (m *muxClient) unblockSend(seq uint32) {
 | |
| 	if !m.checkSeq(seq) {
 | |
| 		return
 | |
| 	}
 | |
| 	select {
 | |
| 	case m.outBlock <- struct{}{}:
 | |
| 	default:
 | |
| 		gridLogIf(m.ctx, errors.New("output unblocked overflow"))
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *muxClient) pong(msg pongMsg) {
 | |
| 	if msg.NotFound || msg.Err != nil {
 | |
| 		err := errors.New("remote terminated call")
 | |
| 		if msg.Err != nil {
 | |
| 			err = fmt.Errorf("remove pong failed: %v", &msg.Err)
 | |
| 		}
 | |
| 		m.addResponse(Response{Err: err})
 | |
| 		return
 | |
| 	}
 | |
| 	atomic.StoreInt64(&m.LastPong, time.Now().UnixNano())
 | |
| }
 | |
| 
 | |
| // addResponse will add a response to the response channel.
 | |
| // This function will never block
 | |
| func (m *muxClient) addResponse(r Response) (ok bool) {
 | |
| 	m.respMu.Lock()
 | |
| 	defer m.respMu.Unlock()
 | |
| 	if m.closed {
 | |
| 		return false
 | |
| 	}
 | |
| 	select {
 | |
| 	case m.respWait <- r:
 | |
| 		if r.Err != nil {
 | |
| 			if debugPrint {
 | |
| 				fmt.Println("Closing mux", m.MuxID, "due to error:", r.Err)
 | |
| 			}
 | |
| 			m.closeLocked()
 | |
| 		}
 | |
| 		return true
 | |
| 	default:
 | |
| 		if m.stateless {
 | |
| 			// Drop message if not stateful.
 | |
| 			return
 | |
| 		}
 | |
| 		err := errors.New("INTERNAL ERROR: Response was blocked")
 | |
| 		gridLogIf(m.ctx, err)
 | |
| 		m.closeLocked()
 | |
| 		return false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (m *muxClient) close() {
 | |
| 	if debugPrint {
 | |
| 		fmt.Println("closing outgoing mux", m.MuxID)
 | |
| 	}
 | |
| 	if !m.respMu.TryLock() {
 | |
| 		// Cancel before locking - will unblock any pending sends.
 | |
| 		if m.cancelFn != nil {
 | |
| 			m.cancelFn(context.Canceled)
 | |
| 		}
 | |
| 		// Wait for senders to release.
 | |
| 		m.respMu.Lock()
 | |
| 	}
 | |
| 
 | |
| 	defer m.respMu.Unlock()
 | |
| 	m.closeLocked()
 | |
| }
 | |
| 
 | |
| func (m *muxClient) closeLocked() {
 | |
| 	if m.closed {
 | |
| 		return
 | |
| 	}
 | |
| 	// We hold the lock, so nobody can modify m.respWait while we're closing.
 | |
| 	if m.respWait != nil {
 | |
| 		xioutil.SafeClose(m.respWait)
 | |
| 		m.respWait = nil
 | |
| 	}
 | |
| 	m.closed = true
 | |
| }
 |