mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 15:47:01 +02:00
MEDIUM: channel: Use CF_READ_EVENT instead of CF_READ_PARTIAL
CF_READ_PARTIAL flag is now merged with CF_READ_EVENT. It means CF_READ_EVENT is set when a read0 is received (formely CF_READ_NULL) or when data are received (formely CF_READ_ACTIVITY). There is nothing special here, except conditions to wake the stream up in sc_notify(). Indeed, the test was a bit changed to reflect recent change. read0 event is now formalized by (CF_READ_EVENT + CF_SHUTR).
This commit is contained in:
parent
b96f2aa380
commit
285f7616ee
@ -54,10 +54,10 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#define CF_READ_EVENT 0x00000001 /* a read event detected on producer side */
|
#define CF_READ_EVENT 0x00000001 /* a read event detected on producer side */
|
||||||
#define CF_READ_PARTIAL 0x00000002 /* some data were read from producer or a read exception occurred */
|
/* unused: 0x00000002 */
|
||||||
#define CF_READ_TIMEOUT 0x00000004 /* timeout while waiting for producer */
|
#define CF_READ_TIMEOUT 0x00000004 /* timeout while waiting for producer */
|
||||||
#define CF_READ_ERROR 0x00000008 /* unrecoverable error on producer side */
|
#define CF_READ_ERROR 0x00000008 /* unrecoverable error on producer side */
|
||||||
#define CF_READ_ACTIVITY (CF_READ_EVENT|CF_READ_PARTIAL|CF_READ_ERROR)
|
#define CF_READ_ACTIVITY (CF_READ_EVENT|CF_READ_ERROR)
|
||||||
|
|
||||||
/* unused: 0x00000010 */
|
/* unused: 0x00000010 */
|
||||||
#define CF_SHUTR 0x00000020 /* producer has already shut down */
|
#define CF_SHUTR 0x00000020 /* producer has already shut down */
|
||||||
@ -138,7 +138,7 @@ static forceinline char *chn_show_flags(char *buf, size_t len, const char *delim
|
|||||||
/* prologue */
|
/* prologue */
|
||||||
_(0);
|
_(0);
|
||||||
/* flags */
|
/* flags */
|
||||||
_(CF_READ_EVENT, _(CF_READ_PARTIAL, _(CF_READ_TIMEOUT, _(CF_READ_ERROR,
|
_(CF_READ_EVENT, _(CF_READ_TIMEOUT, _(CF_READ_ERROR,
|
||||||
_(CF_SHUTR, _(CF_SHUTR_NOW, _(CF_READ_NOEXP, _(CF_WRITE_EVENT,
|
_(CF_SHUTR, _(CF_SHUTR_NOW, _(CF_READ_NOEXP, _(CF_WRITE_EVENT,
|
||||||
_(CF_WRITE_PARTIAL, _(CF_WRITE_TIMEOUT, _(CF_WRITE_ERROR,
|
_(CF_WRITE_PARTIAL, _(CF_WRITE_TIMEOUT, _(CF_WRITE_ERROR,
|
||||||
_(CF_WAKE_WRITE, _(CF_SHUTW, _(CF_SHUTW_NOW, _(CF_AUTO_CLOSE,
|
_(CF_WAKE_WRITE, _(CF_SHUTW, _(CF_SHUTW_NOW, _(CF_AUTO_CLOSE,
|
||||||
@ -146,7 +146,7 @@ static forceinline char *chn_show_flags(char *buf, size_t len, const char *delim
|
|||||||
_(CF_READ_ATTACHED, _(CF_KERN_SPLICING, _(CF_READ_DONTWAIT,
|
_(CF_READ_ATTACHED, _(CF_KERN_SPLICING, _(CF_READ_DONTWAIT,
|
||||||
_(CF_AUTO_CONNECT, _(CF_DONT_READ, _(CF_EXPECT_MORE,
|
_(CF_AUTO_CONNECT, _(CF_DONT_READ, _(CF_EXPECT_MORE,
|
||||||
_(CF_SEND_DONTWAIT, _(CF_NEVER_WAIT, _(CF_WAKE_ONCE, _(CF_FLT_ANALYZE,
|
_(CF_SEND_DONTWAIT, _(CF_NEVER_WAIT, _(CF_WAKE_ONCE, _(CF_FLT_ANALYZE,
|
||||||
_(CF_EOI, _(CF_ISRESP)))))))))))))))))))))))))))))));
|
_(CF_EOI, _(CF_ISRESP))))))))))))))))))))))))))))));
|
||||||
/* epilogue */
|
/* epilogue */
|
||||||
_(~0U);
|
_(~0U);
|
||||||
return buf;
|
return buf;
|
||||||
|
@ -378,7 +378,7 @@ static inline void channel_add_input(struct channel *chn, unsigned int len)
|
|||||||
}
|
}
|
||||||
/* notify that some data was read */
|
/* notify that some data was read */
|
||||||
chn->total += len;
|
chn->total += len;
|
||||||
chn->flags |= CF_READ_PARTIAL;
|
chn->flags |= CF_READ_EVENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline unsigned long long channel_htx_forward(struct channel *chn, struct htx *htx, unsigned long long bytes)
|
static inline unsigned long long channel_htx_forward(struct channel *chn, struct htx *htx, unsigned long long bytes)
|
||||||
|
@ -1849,7 +1849,7 @@ static int connect_server(struct stream *s)
|
|||||||
* care of it.
|
* care of it.
|
||||||
*/
|
*/
|
||||||
if (sc_ep_test(s->scb, SE_FL_EOI) && !(sc_ic(s->scb)->flags & CF_EOI))
|
if (sc_ep_test(s->scb, SE_FL_EOI) && !(sc_ic(s->scb)->flags & CF_EOI))
|
||||||
sc_ic(s->scb)->flags |= (CF_EOI|CF_READ_PARTIAL);
|
sc_ic(s->scb)->flags |= (CF_EOI|CF_READ_EVENT);
|
||||||
|
|
||||||
/* catch all sync connect while the mux is not already installed */
|
/* catch all sync connect while the mux is not already installed */
|
||||||
if (!srv_conn->mux && !(srv_conn->flags & CO_FL_WAIT_XPRT)) {
|
if (!srv_conn->mux && !(srv_conn->flags & CO_FL_WAIT_XPRT)) {
|
||||||
|
@ -119,7 +119,7 @@ int ci_putchr(struct channel *chn, char c)
|
|||||||
*ci_tail(chn) = c;
|
*ci_tail(chn) = c;
|
||||||
|
|
||||||
b_add(&chn->buf, 1);
|
b_add(&chn->buf, 1);
|
||||||
chn->flags |= CF_READ_PARTIAL;
|
chn->flags |= CF_READ_EVENT;
|
||||||
|
|
||||||
if (chn->to_forward >= 1) {
|
if (chn->to_forward >= 1) {
|
||||||
if (chn->to_forward != CHN_INFINITE_FORWARD)
|
if (chn->to_forward != CHN_INFINITE_FORWARD)
|
||||||
|
27
src/stconn.c
27
src/stconn.c
@ -1186,19 +1186,22 @@ static void sc_notify(struct stconn *sc)
|
|||||||
(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) {
|
(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM))) {
|
||||||
ic->rex = TICK_ETERNITY;
|
ic->rex = TICK_ETERNITY;
|
||||||
}
|
}
|
||||||
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
|
else if ((ic->flags & (CF_SHUTR|CF_READ_EVENT)) == CF_READ_EVENT) {
|
||||||
/* we must re-enable reading if sc_chk_snd() has freed some space */
|
/* we must re-enable reading if sc_chk_snd() has freed some space */
|
||||||
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
|
if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
|
||||||
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
ic->rex = tick_add_ifset(now_ms, ic->rto);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* wake the task up only when needed */
|
/* wake the task up only when needed */
|
||||||
if (/* changes on the production side */
|
if (/* changes on the production side that must be handled:
|
||||||
(ic->flags & (CF_READ_EVENT|CF_READ_ERROR)) ||
|
* - An error on receipt: CF_READ_ERROR or SE_FL_ERROR
|
||||||
!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST) ||
|
* - A read event: shutdown for reads (CF_READ_EVENT + SHUTR)
|
||||||
sc_ep_test(sc, SE_FL_ERROR) ||
|
* end of input (CF_READ_EVENT + CF_EOI)
|
||||||
((ic->flags & CF_READ_PARTIAL) &&
|
* data received and no fast-forwarding (CF_READ_EVENT + !to_forward)
|
||||||
((ic->flags & CF_EOI) || !ic->to_forward || sco->state != SC_ST_EST)) ||
|
* read event while consumer side is not established (CF_READ_EVENT + sco->state != SC_ST_EST)
|
||||||
|
*/
|
||||||
|
((ic->flags & CF_READ_EVENT) && ((ic->flags & (CF_SHUTR|CF_EOI)) || !ic->to_forward || sco->state != SC_ST_EST)) ||
|
||||||
|
(ic->flags & CF_READ_ERROR) || sc_ep_test(sc, SE_FL_ERROR) ||
|
||||||
|
|
||||||
/* changes on the consumption side */
|
/* changes on the consumption side */
|
||||||
(oc->flags & (CF_WRITE_EVENT|CF_WRITE_ERROR)) ||
|
(oc->flags & (CF_WRITE_EVENT|CF_WRITE_ERROR)) ||
|
||||||
@ -1371,7 +1374,7 @@ static int sc_conn_recv(struct stconn *sc)
|
|||||||
ic->to_forward -= ret;
|
ic->to_forward -= ret;
|
||||||
ic->total += ret;
|
ic->total += ret;
|
||||||
cur_read += ret;
|
cur_read += ret;
|
||||||
ic->flags |= CF_READ_PARTIAL;
|
ic->flags |= CF_READ_EVENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
|
if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
|
||||||
@ -1455,7 +1458,7 @@ static int sc_conn_recv(struct stconn *sc)
|
|||||||
/* Add READ_PARTIAL because some data are pending but
|
/* Add READ_PARTIAL because some data are pending but
|
||||||
* cannot be xferred to the channel
|
* cannot be xferred to the channel
|
||||||
*/
|
*/
|
||||||
ic->flags |= CF_READ_PARTIAL;
|
ic->flags |= CF_READ_EVENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ret <= 0) {
|
if (ret <= 0) {
|
||||||
@ -1482,7 +1485,7 @@ static int sc_conn_recv(struct stconn *sc)
|
|||||||
c_adv(ic, fwd);
|
c_adv(ic, fwd);
|
||||||
}
|
}
|
||||||
|
|
||||||
ic->flags |= CF_READ_PARTIAL;
|
ic->flags |= CF_READ_EVENT;
|
||||||
ic->total += ret;
|
ic->total += ret;
|
||||||
|
|
||||||
/* End-of-input reached, we can leave. In this case, it is
|
/* End-of-input reached, we can leave. In this case, it is
|
||||||
@ -1577,7 +1580,7 @@ static int sc_conn_recv(struct stconn *sc)
|
|||||||
/* Report EOI on the channel if it was reached from the mux point of
|
/* Report EOI on the channel if it was reached from the mux point of
|
||||||
* view. */
|
* view. */
|
||||||
if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI)) {
|
if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI)) {
|
||||||
ic->flags |= (CF_EOI|CF_READ_PARTIAL);
|
ic->flags |= (CF_EOI|CF_READ_EVENT);
|
||||||
ret = 1;
|
ret = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1880,7 +1883,7 @@ static int sc_conn_process(struct stconn *sc)
|
|||||||
* care of it.
|
* care of it.
|
||||||
*/
|
*/
|
||||||
if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI))
|
if (sc_ep_test(sc, SE_FL_EOI) && !(ic->flags & CF_EOI))
|
||||||
ic->flags |= (CF_EOI|CF_READ_PARTIAL);
|
ic->flags |= (CF_EOI|CF_READ_EVENT);
|
||||||
|
|
||||||
/* Second step : update the stream connector and channels, try to forward any
|
/* Second step : update the stream connector and channels, try to forward any
|
||||||
* pending data, then possibly wake the stream up based on the new
|
* pending data, then possibly wake the stream up based on the new
|
||||||
|
12
src/stream.c
12
src/stream.c
@ -294,7 +294,7 @@ int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input)
|
|||||||
s->req.buf = *input;
|
s->req.buf = *input;
|
||||||
*input = BUF_NULL;
|
*input = BUF_NULL;
|
||||||
s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
|
s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
|
||||||
s->req.flags |= (s->req.total ? CF_READ_PARTIAL : 0);
|
s->req.flags |= (s->req.total ? CF_READ_EVENT : 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
s->flags &= ~SF_IGNORE;
|
s->flags &= ~SF_IGNORE;
|
||||||
@ -567,7 +567,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
|
|||||||
s->req.buf = *input;
|
s->req.buf = *input;
|
||||||
*input = BUF_NULL;
|
*input = BUF_NULL;
|
||||||
s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
|
s->req.total = (IS_HTX_STRM(s) ? htxbuf(&s->req.buf)->data : b_data(&s->req.buf));
|
||||||
s->req.flags |= (s->req.total ? CF_READ_PARTIAL : 0);
|
s->req.flags |= (s->req.total ? CF_READ_EVENT : 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* it is important not to call the wakeup function directly but to
|
/* it is important not to call the wakeup function directly but to
|
||||||
@ -1516,7 +1516,7 @@ int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_prot
|
|||||||
}
|
}
|
||||||
sc_conn_commit_endp_upgrade(sc);
|
sc_conn_commit_endp_upgrade(sc);
|
||||||
|
|
||||||
s->req.flags &= ~(CF_READ_PARTIAL|CF_AUTO_CONNECT);
|
s->req.flags &= ~(CF_READ_EVENT|CF_AUTO_CONNECT);
|
||||||
s->req.total = 0;
|
s->req.total = 0;
|
||||||
s->flags |= SF_IGNORE;
|
s->flags |= SF_IGNORE;
|
||||||
if (sc_ep_test(sc, SE_FL_DETACHED)) {
|
if (sc_ep_test(sc, SE_FL_DETACHED)) {
|
||||||
@ -1553,8 +1553,8 @@ static void stream_update_both_sc(struct stream *s)
|
|||||||
struct channel *req = &s->req;
|
struct channel *req = &s->req;
|
||||||
struct channel *res = &s->res;
|
struct channel *res = &s->res;
|
||||||
|
|
||||||
req->flags &= ~(CF_READ_EVENT|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL);
|
req->flags &= ~(CF_READ_EVENT|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL);
|
||||||
res->flags &= ~(CF_READ_EVENT|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL);
|
res->flags &= ~(CF_READ_EVENT|CF_READ_ATTACHED|CF_WRITE_EVENT|CF_WRITE_PARTIAL);
|
||||||
|
|
||||||
s->prev_conn_state = scb->state;
|
s->prev_conn_state = scb->state;
|
||||||
|
|
||||||
@ -1710,7 +1710,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
|
|||||||
* to a bogus analyser or the fact that we're ignoring a read0. The
|
* to a bogus analyser or the fact that we're ignoring a read0. The
|
||||||
* call_rate counter only counts calls with no progress made.
|
* call_rate counter only counts calls with no progress made.
|
||||||
*/
|
*/
|
||||||
if (!((req->flags | res->flags) & (CF_READ_PARTIAL|CF_WRITE_PARTIAL))) {
|
if (!((req->flags | res->flags) & (CF_READ_EVENT|CF_WRITE_PARTIAL))) {
|
||||||
rate = update_freq_ctr(&s->call_rate, 1);
|
rate = update_freq_ctr(&s->call_rate, 1);
|
||||||
if (rate >= 100000 && s->call_rate.prev_ctr) // make sure to wait at least a full second
|
if (rate >= 100000 && s->call_rate.prev_ctr) // make sure to wait at least a full second
|
||||||
stream_dump_and_crash(&s->obj_type, read_freq_ctr(&s->call_rate));
|
stream_dump_and_crash(&s->obj_type, read_freq_ctr(&s->call_rate));
|
||||||
|
Loading…
Reference in New Issue
Block a user