diff --git a/include/haproxy/mux_spop-t.h b/include/haproxy/mux_spop-t.h index 181ef51c0..2e6c3c5de 100644 --- a/include/haproxy/mux_spop-t.h +++ b/include/haproxy/mux_spop-t.h @@ -59,7 +59,7 @@ static forceinline char *spop_conn_show_flags(char *buf, size_t len, const char /**** SPOP stream flags (32 bit), in spop_strm->flags ****/ #define SPOP_SF_NONE 0x00000000 -// #define SPOP_SF_ACK_RCVD 0x00000001 /* ACK freme received */ +#define SPOP_SF_ACK_RCVD 0x00000001 /* ACK freme received */ //#define SPOP_SF_ES_SENT 0x00000002 /* end-of-stream sent */ //#define SPOP_SF_EP_SENT 0x00000004 /* end-of-param sent */ //#define SPOP_SF_DISCON_SENT 0x00000008 /* disconnect sent */ @@ -83,7 +83,7 @@ static forceinline char *spop_strm_show_flags(char *buf, size_t len, const char /* prologue */ _(0); /* flags */ - _(SPOP_SF_BLK_MBUSY, _(SPOP_SF_BLK_MROOM, _(SPOP_SF_NOTIFIED))); + _(SPOP_SF_ACK_RCVD, _(SPOP_SF_BLK_MBUSY, _(SPOP_SF_BLK_MROOM, _(SPOP_SF_NOTIFIED)))); /* epilogue */ _(~0U); return buf; diff --git a/src/mux_spop.c b/src/mux_spop.c index 96d62d0f8..5fb6a2440 100644 --- a/src/mux_spop.c +++ b/src/mux_spop.c @@ -1114,9 +1114,12 @@ static inline void spop_strm_close(struct spop_strm *spop_strm) */ static inline void spop_strm_propagate_term_flags(struct spop_conn *spop_conn, struct spop_strm *spop_strm) { + if (spop_strm->flags & SPOP_SF_ACK_RCVD) { + se_fl_set(spop_strm->sd, SE_FL_EOI); + } if (spop_conn_read0_pending(spop_conn) || spop_strm->state == SPOP_SS_CLOSED) { se_fl_set(spop_strm->sd, SE_FL_EOS); - if (spop_conn->errcode) + if (!se_fl_test(spop_strm->sd, SE_FL_EOI)) se_fl_set(spop_strm->sd, SE_FL_ERROR); } if (se_fl_test(spop_strm->sd, SE_FL_ERR_PENDING)) @@ -1270,9 +1273,8 @@ static void spop_strm_wake_one_stream(struct spop_strm *spop_strm) } if (spop_conn->state == SPOP_CS_CLOSED || (spop_conn->flags & (SPOP_CF_ERR_PENDING|SPOP_CF_ERROR))) { - if (spop_conn->state == SPOP_CS_CLOSED || (spop_conn->flags & SPOP_CF_ERROR)) - se_fl_set(spop_strm->sd, SE_FL_EOS); se_fl_set_error(spop_strm->sd); + spop_strm_propagate_term_flags(spop_conn, spop_strm); if (!spop_strm->sd->abort_info.info) { spop_strm->sd->abort_info.info = (SE_ABRT_SRC_MUX_SPOP << SE_ABRT_SRC_SHIFT); spop_strm->sd->abort_info.code = spop_conn->errcode; @@ -1920,6 +1922,7 @@ static int spop_conn_handle_ack(struct spop_conn *spop_conn, struct spop_strm *s spop_strm_close(spop_strm); end: + spop_strm->flags |= SPOP_SF_ACK_RCVD; TRACE_PROTO("SPOP AGENT ACK frame rcvd", SPOP_EV_RX_FRAME|SPOP_EV_RX_ACK, spop_conn->conn, spop_strm, 0, (size_t[]){sent}); spop_conn->state = SPOP_CS_FRAME_H; TRACE_LEAVE(SPOP_EV_RX_FRAME|SPOP_EV_RX_ACK, spop_conn->conn, spop_strm); @@ -2101,6 +2104,7 @@ static void spop_process_demux(struct spop_conn *spop_conn) (b_data(&spop_strm->rxbuf) || spop_conn_read0_pending(spop_conn) || spop_strm->state == SPOP_SS_CLOSED || + (spop_strm->flags & SPOP_SF_ACK_RCVD) || se_fl_test(spop_strm->sd, SE_FL_ERROR | SE_FL_ERR_PENDING | SE_FL_EOS))) { /* we may have to signal the upper layers */ TRACE_DEVEL("notifying stream before switching SID", SPOP_EV_RX_FRAME|SPOP_EV_STRM_WAKE, spop_conn->conn, spop_strm); @@ -2205,6 +2209,7 @@ static void spop_process_demux(struct spop_conn *spop_conn) (b_data(&spop_strm->rxbuf) || spop_conn_read0_pending(spop_conn) || spop_strm->state == SPOP_SS_CLOSED || + (spop_strm->flags & SPOP_SF_ACK_RCVD) || se_fl_test(spop_strm->sd, SE_FL_ERROR | SE_FL_ERR_PENDING | SE_FL_EOS))) { /* we may have to signal the upper layers */ TRACE_DEVEL("notifying stream before switching SID", SPOP_EV_RX_FRAME|SPOP_EV_STRM_WAKE, spop_conn->conn, spop_strm);