mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 15:17:01 +02:00
MINOR: stream_interface: Give stream_interface its own wait_list.
Instead of just using the conn_stream wait_list, give the stream_interface its own. When the conn_stream will have its own buffers, the stream_interface may have to wait on it.
This commit is contained in:
parent
91894cbf4c
commit
8f0b4c66f5
@ -116,7 +116,7 @@ static inline struct stream_interface *si_opposite(struct stream_interface *si)
|
||||
* any endpoint and only keeps its side which is expected to have already been
|
||||
* set.
|
||||
*/
|
||||
static inline void si_reset(struct stream_interface *si)
|
||||
static inline int si_reset(struct stream_interface *si)
|
||||
{
|
||||
si->err_type = SI_ET_NONE;
|
||||
si->conn_retries = 0; /* used for logging too */
|
||||
@ -125,6 +125,14 @@ static inline void si_reset(struct stream_interface *si)
|
||||
si->end = NULL;
|
||||
si->state = si->prev_state = SI_ST_INI;
|
||||
si->ops = &si_embedded_ops;
|
||||
si->wait_list.task = tasklet_new();
|
||||
if (!si->wait_list.task)
|
||||
return -1;
|
||||
si->wait_list.task->process = si_cs_io_cb;
|
||||
si->wait_list.task->context = si;
|
||||
si->wait_list.wait_reason = 0;
|
||||
LIST_INIT(&si->wait_list.list);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* sets the current and previous state of a stream interface to <state>. This
|
||||
|
@ -101,6 +101,7 @@ struct stream_interface {
|
||||
unsigned int err_type; /* first error detected, one of SI_ET_* */
|
||||
int conn_retries; /* number of connect retries left */
|
||||
unsigned int hcto; /* half-closed timeout (0 = unset) */
|
||||
struct wait_list wait_list; /* We're in a wait list */
|
||||
};
|
||||
|
||||
/* operations available on a stream-interface */
|
||||
|
11
src/stream.c
11
src/stream.c
@ -192,7 +192,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||
vars_init(&s->vars_reqres, SCOPE_REQ);
|
||||
|
||||
/* this part should be common with other protocols */
|
||||
si_reset(&s->si[0]);
|
||||
if (si_reset(&s->si[0]) < 0)
|
||||
goto out_fail_alloc;
|
||||
si_set_state(&s->si[0], SI_ST_EST);
|
||||
s->si[0].hcto = sess->fe->timeout.clientfin;
|
||||
|
||||
@ -211,7 +212,8 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||
/* pre-initialize the other side's stream interface to an INIT state. The
|
||||
* callbacks will be initialized before attempting to connect.
|
||||
*/
|
||||
si_reset(&s->si[1]);
|
||||
if (si_reset(&s->si[1]) < 0)
|
||||
goto out_fail_alloc_si1;
|
||||
s->si[1].hcto = TICK_ETERNITY;
|
||||
|
||||
if (likely(sess->fe->options2 & PR_O2_INDEPSTR))
|
||||
@ -288,6 +290,9 @@ struct stream *stream_new(struct session *sess, enum obj_type *origin)
|
||||
out_fail_accept:
|
||||
flt_stream_release(s, 0);
|
||||
task_free(t);
|
||||
tasklet_free(s->si[1].wait_list.task);
|
||||
out_fail_alloc_si1:
|
||||
tasklet_free(s->si[0].wait_list.task);
|
||||
out_fail_alloc:
|
||||
LIST_DEL(&s->list);
|
||||
pool_free(pool_head_stream, s);
|
||||
@ -403,6 +408,8 @@ static void stream_free(struct stream *s)
|
||||
if (must_free_sess)
|
||||
session_free(sess);
|
||||
|
||||
tasklet_free(s->si[0].wait_list.task);
|
||||
tasklet_free(s->si[1].wait_list.task);
|
||||
pool_free(pool_head_stream, s);
|
||||
|
||||
/* We may want to free the maximum amount of pools if the proxy is stopping */
|
||||
|
@ -651,7 +651,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
|
||||
int did_send = 0;
|
||||
|
||||
/* We're already waiting to be able to send, give up */
|
||||
if (cs->wait_list.wait_reason & SUB_CAN_SEND)
|
||||
if (si->wait_list.wait_reason & SUB_CAN_SEND)
|
||||
return NULL;
|
||||
|
||||
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
|
||||
@ -660,7 +660,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
|
||||
if (conn->flags & CO_FL_HANDSHAKE) {
|
||||
/* a handshake was requested */
|
||||
/* Schedule ourself to be woken up once the handshake is done */
|
||||
conn->xprt->subscribe(conn, SUB_CAN_SEND, wl_set_waitcb(&cs->wait_list, si_cs_io_cb, cs));
|
||||
conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -740,7 +740,7 @@ static struct task * si_cs_send(struct conn_stream *cs)
|
||||
}
|
||||
/* We couldn't send all of our data, let the mux know we'd like to send more */
|
||||
if (co_data(oc))
|
||||
conn->mux->subscribe(cs, SUB_CAN_SEND, wl_set_waitcb(&cs->wait_list, si_cs_io_cb, cs));
|
||||
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list);
|
||||
|
||||
wake_others:
|
||||
/* Maybe somebody was waiting for this conn_stream, wake them */
|
||||
@ -759,7 +759,9 @@ static struct task * si_cs_send(struct conn_stream *cs)
|
||||
|
||||
struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
|
||||
{
|
||||
si_cs_send(ctx);
|
||||
struct stream_interface *si = ctx;
|
||||
if (!(si->wait_list.wait_reason & SUB_CAN_SEND))
|
||||
si_cs_send(__objt_cs(si->end));
|
||||
return (NULL);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user