CLEANUP: connection: rename subscription events values and event field

The SUB_CAN_SEND/SUB_CAN_RECV enum values have been confusing a few
times, especially when checking them on reading. After some discussion,
it appears that calling them SUB_RETRY_SEND/SUB_RETRY_RECV more
accurately reflects their purpose since these events may only appear
after a first attempt to perform the I/O operation has failed or was
not completed.

In addition the wait_reason field in struct wait_event which carries
them makes one think that a single reason may happen at once while
it is in fact a set of events. Since the struct is called wait_event
it makes sense that this field is called "events" to indicate it's the
list of events we're subscribed to.

Last, the values for SUB_RETRY_RECV/SEND were swapped so that value
1 corresponds to recv and 2 to send, as is done almost everywhere else
in the code an in the shutdown() call.
This commit is contained in:
Willy Tarreau 2018-12-19 13:59:17 +01:00
parent b61481c710
commit 4f6516d677
9 changed files with 128 additions and 125 deletions

View File

@ -650,11 +650,11 @@ static inline const struct conn_stream *cs_get_first(const struct connection *co
static inline void conn_force_unsubscribe(struct connection *conn)
{
if (conn->recv_wait) {
conn->recv_wait->wait_reason &= ~SUB_CAN_RECV;
conn->recv_wait->events &= ~SUB_RETRY_RECV;
conn->recv_wait = NULL;
}
if (conn->send_wait) {
conn->send_wait->wait_reason &= ~SUB_CAN_SEND;
conn->send_wait->events &= ~SUB_RETRY_SEND;
conn->send_wait = NULL;
}

View File

@ -130,7 +130,7 @@ static inline int si_reset(struct stream_interface *si)
return -1;
si->wait_event.task->process = si_cs_io_cb;
si->wait_event.task->context = si;
si->wait_event.wait_reason = 0;
si->wait_event.events = 0;
return 0;
}
@ -169,8 +169,8 @@ static inline void si_release_endpoint(struct stream_interface *si)
return;
if ((cs = objt_cs(si->end))) {
if (si->wait_event.wait_reason != 0)
cs->conn->mux->unsubscribe(cs, si->wait_event.wait_reason,
if (si->wait_event.events != 0)
cs->conn->mux->unsubscribe(cs, si->wait_event.events,
&si->wait_event);
cs_destroy(cs);
}
@ -461,7 +461,7 @@ static inline int si_sync_recv(struct stream_interface *si)
if (!cs)
return 0; // only conn_streams are supported
if (si->wait_event.wait_reason & SUB_CAN_RECV)
if (si->wait_event.events & SUB_RETRY_RECV)
return 0; // already subscribed
if (!si_rx_endp_ready(si) || si_rx_blocked(si))

View File

@ -47,16 +47,19 @@ struct server;
struct session;
struct pipe;
/* Note: subscribing to these events is only valid after the caller has really
* attempted to perform the operation, and failed to proceed or complete.
*/
enum sub_event_type {
SUB_CAN_SEND = 0x00000001, /* Schedule the tasklet when we can send more */
SUB_CAN_RECV = 0x00000002, /* Schedule the tasklet when we can recv more */
SUB_RETRY_RECV = 0x00000001, /* Schedule the tasklet when we can attempt to recv again */
SUB_RETRY_SEND = 0x00000002, /* Schedule the tasklet when we can attempt to send again */
SUB_CALL_UNSUBSCRIBE = 0x00000004, /* The mux wants its unsubscribe() method to be called before destruction of the underlying object */
};
struct wait_event {
struct tasklet *task;
void *handle; /* To be used by the callee */
int wait_reason;
int events; /* set of enum sub_event_type above */
};
/* A connection handle is how we differentiate two connections on the lower

View File

@ -712,9 +712,9 @@ static struct task *event_srv_chk_io(struct task *t, void *ctx, unsigned short s
struct check *check = ctx;
struct conn_stream *cs = check->cs;
if (!(check->wait_list.wait_reason & SUB_CAN_SEND))
if (!(check->wait_list.events & SUB_RETRY_SEND))
wake_srv_chk(cs);
if (!(check->wait_list.wait_reason & SUB_CAN_RECV)) {
if (!(check->wait_list.events & SUB_RETRY_RECV)) {
HA_SPIN_LOCK(SERVER_LOCK, &check->server->lock);
__event_srv_chk_r(cs);
HA_SPIN_UNLOCK(SERVER_LOCK, &check->server->lock);
@ -739,7 +739,7 @@ static void __event_srv_chk_w(struct conn_stream *cs)
goto out_wakeup;
if (conn->flags & CO_FL_HANDSHAKE) {
cs->conn->mux->subscribe(cs, SUB_CAN_SEND, &check->wait_list);
cs->conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
goto out;
}
@ -773,7 +773,7 @@ static void __event_srv_chk_w(struct conn_stream *cs)
goto out_wakeup;
}
if (b_data(&check->bo)) {
conn->mux->subscribe(cs, SUB_CAN_SEND, &check->wait_list);
conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
goto out;
}
}
@ -824,7 +824,7 @@ static void __event_srv_chk_r(struct conn_stream *cs)
goto out_wakeup;
if (conn->flags & CO_FL_HANDSHAKE) {
cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &check->wait_list);
cs->conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
goto out;
}
@ -1380,7 +1380,7 @@ out:
return;
wait_more_data:
cs->conn->mux->subscribe(cs, SUB_CAN_RECV, &check->wait_list);
cs->conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
goto out;
}
@ -1403,7 +1403,7 @@ static int wake_srv_chk(struct conn_stream *cs)
ret = tcpcheck_main(check);
cs = check->cs;
conn = cs->conn;
} else if (!(check->wait_list.wait_reason & SUB_CAN_SEND))
} else if (!(check->wait_list.events & SUB_RETRY_SEND))
__event_srv_chk_w(cs);
if (unlikely(conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)) {
@ -1576,7 +1576,7 @@ static int connect_conn_chk(struct task *t)
return SF_ERR_RESOURCE;
conn = cs->conn;
/* Maybe there were an older connection we were waiting on */
check->wait_list.wait_reason = 0;
check->wait_list.events = 0;
if (is_addr(&check->addr)) {
/* we'll connect to the check addr specified on the server */
@ -2693,7 +2693,7 @@ static int tcpcheck_main(struct check *check)
break;
}
if (b_data(&check->bo)) {
cs->conn->mux->subscribe(cs, SUB_CAN_SEND, &check->wait_list);
cs->conn->mux->subscribe(cs, SUB_RETRY_SEND, &check->wait_list);
goto out;
}
}
@ -2755,7 +2755,7 @@ static int tcpcheck_main(struct check *check)
check->cs = cs;
conn = cs->conn;
/* Maybe there were an older connection we were waiting on */
check->wait_list.wait_reason = 0;
check->wait_list.events = 0;
conn->target = &s->obj_type;
/* no client address */
@ -2919,7 +2919,7 @@ static int tcpcheck_main(struct check *check)
}
}
else {
conn->mux->subscribe(cs, SUB_CAN_RECV, &check->wait_list);
conn->mux->subscribe(cs, SUB_RETRY_RECV, &check->wait_list);
break;
}
}
@ -3112,7 +3112,7 @@ const char *init_check(struct check *check, int type)
check->wait_list.task = tasklet_new();
if (!check->wait_list.task)
return "out of memroy while allocating check tasklet";
check->wait_list.wait_reason = 0;
check->wait_list.events = 0;
check->wait_list.task->process = event_srv_chk_io;
check->wait_list.task->context = check;
return NULL;

View File

@ -112,7 +112,7 @@ void conn_fd_handler(int fd)
*/
flags = 0;
if (conn->send_wait != NULL) {
conn->send_wait->wait_reason &= ~SUB_CAN_SEND;
conn->send_wait->events &= ~SUB_RETRY_SEND;
tasklet_wakeup(conn->send_wait->task);
conn->send_wait = NULL;
} else
@ -132,7 +132,7 @@ void conn_fd_handler(int fd)
*/
flags = 0;
if (conn->recv_wait) {
conn->recv_wait->wait_reason &= ~SUB_CAN_RECV;
conn->recv_wait->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(conn->recv_wait->task);
conn->recv_wait = NULL;
} else
@ -320,19 +320,19 @@ int conn_unsubscribe(struct connection *conn, int event_type, void *param)
{
struct wait_event *sw;
if (event_type & SUB_CAN_RECV) {
if (event_type & SUB_RETRY_RECV) {
sw = param;
if (sw->wait_reason & SUB_CAN_RECV) {
if (sw->events & SUB_RETRY_RECV) {
conn->recv_wait = NULL;
sw->wait_reason &= ~SUB_CAN_RECV;
sw->events &= ~SUB_RETRY_RECV;
}
__conn_xprt_stop_recv(conn);
}
if (event_type & SUB_CAN_SEND) {
if (event_type & SUB_RETRY_SEND) {
sw = param;
if (sw->wait_reason & SUB_CAN_SEND) {
if (sw->events & SUB_RETRY_SEND) {
conn->send_wait = NULL;
sw->wait_reason &= ~SUB_CAN_SEND;
sw->events &= ~SUB_RETRY_SEND;
}
__conn_xprt_stop_send(conn);
}
@ -344,22 +344,22 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
{
struct wait_event *sw;
if (event_type & SUB_CAN_RECV) {
if (event_type & SUB_RETRY_RECV) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
if (!(sw->events & SUB_RETRY_RECV)) {
sw->events |= SUB_RETRY_RECV;
conn->recv_wait = sw;
}
event_type &= ~SUB_CAN_RECV;
event_type &= ~SUB_RETRY_RECV;
__conn_xprt_want_recv(conn);
}
if (event_type & SUB_CAN_SEND) {
if (event_type & SUB_RETRY_SEND) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
if (!(sw->events & SUB_RETRY_SEND)) {
sw->events |= SUB_RETRY_SEND;
conn->send_wait = sw;
}
event_type &= ~SUB_CAN_SEND;
event_type &= ~SUB_RETRY_SEND;
__conn_xprt_want_send(conn);
}
if (event_type != 0)

View File

@ -321,9 +321,9 @@ static void h1s_destroy(struct h1s *h1s)
h1c->h1s = NULL;
if (h1s->recv_wait != NULL)
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
h1s->recv_wait->events &= ~SUB_RETRY_RECV;
if (h1s->send_wait != NULL)
h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
h1s->send_wait->events &= ~SUB_RETRY_SEND;
h1c->flags &= ~H1C_F_IN_BUSY;
h1c->flags |= H1C_F_WAIT_NEXT_REQ;
@ -370,7 +370,7 @@ static int h1_init(struct connection *conn, struct proxy *proxy, struct session
goto fail;
h1c->wait_event.task->process = h1_io_cb;
h1c->wait_event.task->context = h1c;
h1c->wait_event.wait_reason = 0;
h1c->wait_event.events = 0;
if (!(conn->flags & CO_FL_CONNECTED))
h1c->flags |= H1C_F_CS_WAIT_CONN;
@ -421,8 +421,8 @@ static void h1_release(struct connection *conn)
tasklet_free(h1c->wait_event.task);
h1s_destroy(h1c->h1s);
if (h1c->wait_event.wait_reason != 0)
conn->xprt->unsubscribe(conn, h1c->wait_event.wait_reason,
if (h1c->wait_event.events != 0)
conn->xprt->unsubscribe(conn, h1c->wait_event.events,
&h1c->wait_event);
pool_free(pool_head_h1c, h1c);
}
@ -1624,7 +1624,7 @@ static size_t h1_process_output(struct h1c *h1c, struct buffer *buf, size_t coun
static void h1_wake_stream_for_recv(struct h1s *h1s)
{
if (h1s && h1s->recv_wait) {
h1s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
h1s->recv_wait->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(h1s->recv_wait->task);
h1s->recv_wait = NULL;
}
@ -1632,7 +1632,7 @@ static void h1_wake_stream_for_recv(struct h1s *h1s)
static void h1_wake_stream_for_send(struct h1s *h1s)
{
if (h1s && h1s->send_wait) {
h1s->send_wait->wait_reason &= ~SUB_CAN_SEND;
h1s->send_wait->events &= ~SUB_RETRY_SEND;
tasklet_wakeup(h1s->send_wait->task);
h1s->send_wait = NULL;
}
@ -1648,7 +1648,7 @@ static int h1_recv(struct h1c *h1c)
size_t ret = 0, max;
int rcvd = 0;
if (h1c->wait_event.wait_reason & SUB_CAN_RECV)
if (h1c->wait_event.events & SUB_RETRY_RECV)
return (b_data(&h1c->ibuf));
if (!h1_recv_allowed(h1c)) {
@ -1700,7 +1700,7 @@ static int h1_recv(struct h1c *h1c)
goto end;
}
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h1c->wait_event);
conn->xprt->subscribe(conn, SUB_RETRY_RECV, &h1c->wait_event);
end:
if (ret > 0 || (conn->flags & CO_FL_ERROR) || conn_xprt_read0_pending(conn))
@ -1730,8 +1730,8 @@ static int h1_send(struct h1c *h1c)
return 0;
if (h1c->flags & H1C_F_CS_WAIT_CONN) {
if (!(h1c->wait_event.wait_reason & SUB_CAN_SEND))
conn->xprt->subscribe(conn, SUB_CAN_SEND, &h1c->wait_event);
if (!(h1c->wait_event.events & SUB_RETRY_SEND))
conn->xprt->subscribe(conn, SUB_RETRY_SEND, &h1c->wait_event);
return 0;
}
@ -1764,8 +1764,8 @@ static int h1_send(struct h1c *h1c)
if (h1c->flags & H1C_F_CS_SHUTW_NOW)
h1_shutw_conn(conn);
}
else if (!(h1c->wait_event.wait_reason & SUB_CAN_SEND))
conn->xprt->subscribe(conn, SUB_CAN_SEND, &h1c->wait_event);
else if (!(h1c->wait_event.events & SUB_RETRY_SEND))
conn->xprt->subscribe(conn, SUB_RETRY_SEND, &h1c->wait_event);
return sent;
}
@ -1832,9 +1832,9 @@ static struct task *h1_io_cb(struct task *t, void *ctx, unsigned short status)
struct h1c *h1c = ctx;
int ret = 0;
if (!(h1c->wait_event.wait_reason & SUB_CAN_SEND))
if (!(h1c->wait_event.events & SUB_RETRY_SEND))
ret = h1_send(h1c);
if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV))
if (!(h1c->wait_event.events & SUB_RETRY_RECV))
ret |= h1_recv(h1c);
if (ret || !h1c->h1s)
h1_process(h1c);
@ -2052,17 +2052,17 @@ static int h1_unsubscribe(struct conn_stream *cs, int event_type, void *param)
if (!h1s)
return 0;
if (event_type & SUB_CAN_RECV) {
if (event_type & SUB_RETRY_RECV) {
sw = param;
if (h1s->recv_wait == sw) {
sw->wait_reason &= ~SUB_CAN_RECV;
sw->events &= ~SUB_RETRY_RECV;
h1s->recv_wait = NULL;
}
}
if (event_type & SUB_CAN_SEND) {
if (event_type & SUB_RETRY_SEND) {
sw = param;
if (h1s->send_wait == sw) {
sw->wait_reason &= ~SUB_CAN_SEND;
sw->events &= ~SUB_RETRY_SEND;
h1s->send_wait = NULL;
}
}
@ -2079,18 +2079,18 @@ static int h1_subscribe(struct conn_stream *cs, int event_type, void *param)
return -1;
switch (event_type) {
case SUB_CAN_RECV:
case SUB_RETRY_RECV:
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
if (!(sw->events & SUB_RETRY_RECV)) {
sw->events |= SUB_RETRY_RECV;
sw->handle = h1s;
h1s->recv_wait = sw;
}
return 0;
case SUB_CAN_SEND:
case SUB_RETRY_SEND:
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
if (!(sw->events & SUB_RETRY_SEND)) {
sw->events |= SUB_RETRY_SEND;
sw->handle = h1s;
h1s->send_wait = sw;
}
@ -2115,7 +2115,7 @@ static size_t h1_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
h1s->flags |= H1S_F_BUF_FLUSH;
else if (ret > 0 || (h1s->flags & H1S_F_SPLICED_DATA)) {
h1s->flags &= ~H1S_F_SPLICED_DATA;
if (!(h1c->wait_event.wait_reason & SUB_CAN_RECV))
if (!(h1c->wait_event.events & SUB_RETRY_RECV))
tasklet_wakeup(h1c->wait_event.task);
}
return ret;
@ -2187,8 +2187,8 @@ static int h1_snd_pipe(struct conn_stream *cs, struct pipe *pipe)
ret = cs->conn->xprt->snd_pipe(cs->conn, pipe);
end:
if (pipe->data) {
if (!(h1s->h1c->wait_event.wait_reason & SUB_CAN_SEND))
cs->conn->xprt->subscribe(cs->conn, SUB_CAN_SEND, &h1s->h1c->wait_event);
if (!(h1s->h1c->wait_event.events & SUB_RETRY_SEND))
cs->conn->xprt->subscribe(cs->conn, SUB_RETRY_SEND, &h1s->h1c->wait_event);
}
return ret;
}

View File

@ -416,7 +416,7 @@ static int h2_init(struct connection *conn, struct proxy *prx, struct session *s
goto fail;
h2c->wait_event.task->process = h2_io_cb;
h2c->wait_event.task->context = h2c;
h2c->wait_event.wait_reason = 0;
h2c->wait_event.events = 0;
h2c->ddht = hpack_dht_alloc(h2_settings_header_table_size);
if (!h2c->ddht)
@ -535,8 +535,8 @@ static void h2_release(struct connection *conn)
}
if (h2c->wait_event.task)
tasklet_free(h2c->wait_event.task);
if (h2c->wait_event.wait_reason != 0)
conn->xprt->unsubscribe(conn, h2c->wait_event.wait_reason,
if (h2c->wait_event.events != 0)
conn->xprt->unsubscribe(conn, h2c->wait_event.events,
&h2c->wait_event);
pool_free(pool_head_h2c, h2c);
@ -703,9 +703,9 @@ static void h2s_destroy(struct h2s *h2s)
offer_buffers(NULL, tasks_run_queue);
}
if (h2s->send_wait != NULL)
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
h2s->send_wait->events &= ~SUB_RETRY_SEND;
if (h2s->recv_wait != NULL)
h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
h2s->recv_wait->events &= ~SUB_RETRY_RECV;
/* There's no need to explicitly call unsubscribe here, the only
* reference left would be in the h2c send_list/fctl_list, and if
* we're in it, we're getting out anyway
@ -740,7 +740,7 @@ static struct h2s *h2s_new(struct h2c *h2c, int id)
h2s->wait_event.task->process = h2_deferred_shut;
h2s->wait_event.task->context = h2s;
h2s->wait_event.handle = NULL;
h2s->wait_event.wait_reason = 0;
h2s->wait_event.events = 0;
LIST_INIT(&h2s->list);
h2s->h2c = h2c;
h2s->cs = NULL;
@ -1265,7 +1265,7 @@ static void h2_wake_some_streams(struct h2c *h2c, int last, uint32_t flags)
if (h2s->recv_wait) {
struct wait_event *sw = h2s->recv_wait;
sw->wait_reason &= ~SUB_CAN_RECV;
sw->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(sw->task);
h2s->recv_wait = NULL;
} else if (h2s->cs->data_cb->wake != NULL)
@ -1750,7 +1750,7 @@ static int h2c_handle_rst_stream(struct h2c *h2c, struct h2s *h2s)
if (h2s->recv_wait) {
struct wait_event *sw = h2s->recv_wait;
sw->wait_reason &= ~SUB_CAN_RECV;
sw->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(sw->task);
h2s->recv_wait = NULL;
}
@ -2106,7 +2106,7 @@ static void h2_process_demux(struct h2c *h2c)
/* we may have to signal the upper layers */
h2s->cs->flags |= CS_FL_RCV_MORE;
if (h2s->recv_wait) {
h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
h2s->recv_wait->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(h2s->recv_wait->task);
h2s->recv_wait = NULL;
}
@ -2346,7 +2346,7 @@ static void h2_process_demux(struct h2c *h2c)
/* we may have to signal the upper layers */
h2s->cs->flags |= CS_FL_RCV_MORE;
if (h2s->recv_wait) {
h2s->recv_wait->wait_reason &= ~SUB_CAN_RECV;
h2s->recv_wait->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(h2s->recv_wait->task);
h2s->recv_wait = NULL;
}
@ -2397,8 +2397,8 @@ static int h2_process_mux(struct h2c *h2c)
break;
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
h2s->send_wait->events &= ~SUB_RETRY_SEND;
h2s->send_wait->events |= SUB_CALL_UNSUBSCRIBE;
tasklet_wakeup(h2s->send_wait->task);
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
@ -2410,8 +2410,8 @@ static int h2_process_mux(struct h2c *h2c)
break;
h2s->flags &= ~H2_SF_BLK_ANY;
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
h2s->send_wait->events &= ~SUB_RETRY_SEND;
h2s->send_wait->events |= SUB_CALL_UNSUBSCRIBE;
tasklet_wakeup(h2s->send_wait->task);
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
@ -2445,7 +2445,7 @@ static int h2_recv(struct h2c *h2c)
int max;
size_t ret;
if (h2c->wait_event.wait_reason & SUB_CAN_RECV)
if (h2c->wait_event.events & SUB_RETRY_RECV)
return (b_data(&h2c->dbuf));
if (!h2_recv_allowed(h2c))
@ -2481,7 +2481,7 @@ static int h2_recv(struct h2c *h2c)
} while (ret > 0);
if (h2_recv_allowed(h2c) && (b_data(buf) < buf->size))
conn->xprt->subscribe(conn, SUB_CAN_RECV, &h2c->wait_event);
conn->xprt->subscribe(conn, SUB_RETRY_RECV, &h2c->wait_event);
if (!b_data(buf)) {
h2_release_buf(h2c, &h2c->dbuf);
@ -2571,8 +2571,8 @@ static int h2_send(struct h2c *h2c)
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
LIST_ADDQ(&h2c->sending_list, &h2s->list);
h2s->send_wait->wait_reason &= ~SUB_CAN_SEND;
h2s->send_wait->wait_reason |= SUB_CALL_UNSUBSCRIBE;
h2s->send_wait->events &= ~SUB_RETRY_SEND;
h2s->send_wait->events |= SUB_CALL_UNSUBSCRIBE;
tasklet_wakeup(h2s->send_wait->task);
}
}
@ -2580,8 +2580,8 @@ static int h2_send(struct h2c *h2c)
if (!b_data(&h2c->mbuf))
return sent;
schedule:
if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND))
conn->xprt->subscribe(conn, SUB_CAN_SEND, &h2c->wait_event);
if (!(h2c->wait_event.events & SUB_RETRY_SEND))
conn->xprt->subscribe(conn, SUB_RETRY_SEND, &h2c->wait_event);
return sent;
}
@ -2590,9 +2590,9 @@ static struct task *h2_io_cb(struct task *t, void *ctx, unsigned short status)
struct h2c *h2c = ctx;
int ret = 0;
if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND))
if (!(h2c->wait_event.events & SUB_RETRY_SEND))
ret = h2_send(h2c);
if (!(h2c->wait_event.wait_reason & SUB_CAN_RECV))
if (!(h2c->wait_event.events & SUB_RETRY_RECV))
ret |= h2_recv(h2c);
if (ret || b_data(&h2c->dbuf))
h2_process(h2c);
@ -2647,7 +2647,7 @@ static int h2_process(struct h2c *h2c)
if ((h2s->cs->flags & CS_FL_WAIT_FOR_HS) &&
h2s->recv_wait) {
struct wait_event *sw = h2s->recv_wait;
sw->wait_reason &= ~SUB_CAN_RECV;
sw->events &= ~SUB_RETRY_RECV;
tasklet_wakeup(sw->task);
h2s->recv_wait = NULL;
}
@ -2932,14 +2932,14 @@ static void h2_do_shutr(struct h2s *h2s)
h2c_send_goaway_error(h2c, h2s) <= 0)
return;
if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND))
if (!(h2c->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(h2c->wait_event.task);
h2s_close(h2s);
return;
add_to_list:
if (LIST_ISEMPTY(&h2s->list)) {
sw->wait_reason |= SUB_CAN_SEND;
sw->events |= SUB_RETRY_SEND;
if (h2s->flags & H2_SF_BLK_MFCTL) {
LIST_ADDQ(&h2c->fctl_list, &h2s->list);
h2s->send_wait = sw;
@ -2990,13 +2990,13 @@ static void h2_do_shutw(struct h2s *h2s)
h2s_close(h2s);
}
if (!(h2c->wait_event.wait_reason & SUB_CAN_SEND))
if (!(h2c->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(h2c->wait_event.task);
return;
add_to_list:
if (LIST_ISEMPTY(&h2s->list)) {
sw->wait_reason |= SUB_CAN_SEND;
sw->events |= SUB_RETRY_SEND;
if (h2s->flags & H2_SF_BLK_MFCTL) {
LIST_ADDQ(&h2c->fctl_list, &h2s->list);
h2s->send_wait = sw;
@ -3016,7 +3016,7 @@ static struct task *h2_deferred_shut(struct task *t, void *ctx, unsigned short s
long reason = (long)h2s->wait_event.handle;
if (h2s->send_wait) {
h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
h2s->send_wait->events &= ~SUB_CALL_UNSUBSCRIBE;
h2s->send_wait = NULL;
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
@ -4526,19 +4526,19 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
struct h2s *h2s = cs->ctx;
struct h2c *h2c = h2s->h2c;
if (event_type & SUB_CAN_RECV) {
if (event_type & SUB_RETRY_RECV) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
if (!(sw->events & SUB_RETRY_RECV)) {
sw->events |= SUB_RETRY_RECV;
sw->handle = h2s;
h2s->recv_wait = sw;
}
event_type &= ~SUB_CAN_RECV;
event_type &= ~SUB_RETRY_RECV;
}
if (event_type & SUB_CAN_SEND) {
if (event_type & SUB_RETRY_SEND) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
if (!(sw->events & SUB_RETRY_SEND)) {
sw->events |= SUB_RETRY_SEND;
sw->handle = h2s;
h2s->send_wait = sw;
if (!(h2s->flags & H2_SF_BLK_SFCTL)) {
@ -4548,7 +4548,7 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
LIST_ADDQ(&h2c->send_list, &h2s->list);
}
}
event_type &= ~SUB_CAN_SEND;
event_type &= ~SUB_RETRY_SEND;
}
if (event_type != 0)
return -1;
@ -4562,26 +4562,26 @@ static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
struct wait_event *sw;
struct h2s *h2s = cs->ctx;
if (event_type & SUB_CAN_RECV) {
if (event_type & SUB_RETRY_RECV) {
sw = param;
if (h2s->recv_wait == sw) {
sw->wait_reason &= ~SUB_CAN_RECV;
sw->events &= ~SUB_RETRY_RECV;
h2s->recv_wait = NULL;
}
}
if (event_type & SUB_CAN_SEND) {
if (event_type & SUB_RETRY_SEND) {
sw = param;
if (h2s->send_wait == sw) {
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
sw->wait_reason &= ~SUB_CAN_SEND;
sw->events &= ~SUB_RETRY_SEND;
h2s->send_wait = NULL;
}
}
if (event_type & SUB_CALL_UNSUBSCRIBE) {
sw = param;
if (h2s->send_wait == sw) {
sw->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
sw->events &= ~SUB_CALL_UNSUBSCRIBE;
h2s->send_wait = NULL;
}
}
@ -4642,7 +4642,7 @@ static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
if (ret && h2c->dsi == h2s->id) {
/* demux is blocking on this stream's buffer */
h2c->flags &= ~H2_CF_DEM_SFULL;
if (b_data(&h2c->dbuf) || !(h2c->wait_event.wait_reason & SUB_CAN_RECV)) {
if (b_data(&h2c->dbuf) || !(h2c->wait_event.events & SUB_RETRY_RECV)) {
if (h2_recv_allowed(h2c))
tasklet_wakeup(h2c->wait_event.task);
}
@ -4662,8 +4662,8 @@ static void h2_stop_senders(struct h2c *h2c)
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
task_remove_from_task_list((struct task *)h2s->send_wait->task);
h2s->send_wait->wait_reason |= SUB_CAN_SEND;
h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
h2s->send_wait->events |= SUB_RETRY_SEND;
h2s->send_wait->events &= ~SUB_CALL_UNSUBSCRIBE;
LIST_ADD(&h2c->send_list, &h2s->list);
}
}
@ -4682,7 +4682,7 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
int32_t idx;
if (h2s->send_wait) {
h2s->send_wait->wait_reason &= ~SUB_CALL_UNSUBSCRIBE;
h2s->send_wait->events &= ~SUB_CALL_UNSUBSCRIBE;
h2s->send_wait = NULL;
LIST_DEL(&h2s->list);
LIST_INIT(&h2s->list);
@ -4844,7 +4844,7 @@ static size_t h2_snd_buf(struct conn_stream *cs, struct buffer *buf, size_t coun
total = orig_count;
if (total > 0) {
if (!(h2s->h2c->wait_event.wait_reason & SUB_CAN_SEND))
if (!(h2s->h2c->wait_event.events & SUB_RETRY_SEND))
tasklet_wakeup(h2s->h2c->wait_event.task);
}
@ -4886,7 +4886,7 @@ static void h2_show_fd(struct buffer *msg, struct connection *conn)
" .orph_cnt=%d .sub=%d .dsi=%d .dbuf=%u@%p+%u/%u .msi=%d .mbuf=%u@%p+%u/%u",
h2c->st0, h2c->errcode, h2c->max_id, h2c->last_sid, h2c->flags,
h2c->nb_streams, h2c->nb_cs, fctl_cnt, send_cnt, tree_cnt, orph_cnt,
h2c->wait_event.wait_reason, h2c->dsi,
h2c->wait_event.events, h2c->dsi,
(unsigned int)b_data(&h2c->dbuf), b_orig(&h2c->dbuf),
(unsigned int)b_head_ofs(&h2c->dbuf), (unsigned int)b_size(&h2c->dbuf),
h2c->msi,

View File

@ -52,7 +52,7 @@ static struct task *mux_pt_io_cb(struct task *t, void *tctx, unsigned short stat
if (ctx->conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))
mux_pt_destroy(ctx);
else
ctx->conn->xprt->subscribe(ctx->conn, SUB_CAN_RECV,
ctx->conn->xprt->subscribe(ctx->conn, SUB_RETRY_RECV,
&ctx->wait_event);
return NULL;
@ -76,7 +76,7 @@ static int mux_pt_init(struct connection *conn, struct proxy *prx, struct sessio
goto fail_free_ctx;
ctx->wait_event.task->context = ctx;
ctx->wait_event.task->process = mux_pt_io_cb;
ctx->wait_event.wait_reason = 0;
ctx->wait_event.events = 0;
ctx->conn = conn;
if (!cs) {
@ -143,7 +143,7 @@ static struct conn_stream *mux_pt_attach(struct connection *conn, struct session
struct conn_stream *cs;
struct mux_pt_ctx *ctx = conn->mux_ctx;
conn->xprt->unsubscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
conn->xprt->unsubscribe(conn, SUB_RETRY_RECV, &ctx->wait_event);
cs = cs_new(conn);
if (!cs)
goto fail;
@ -187,7 +187,7 @@ static void mux_pt_detach(struct conn_stream *cs)
if (conn->owner != NULL &&
!(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_SOCK_WR_SH))) {
ctx->cs = NULL;
conn->xprt->subscribe(conn, SUB_CAN_RECV, &ctx->wait_event);
conn->xprt->subscribe(conn, SUB_RETRY_RECV, &ctx->wait_event);
} else
/* There's no session attached to that connection, destroy it */
mux_pt_destroy(ctx);

View File

@ -545,7 +545,7 @@ static int si_cs_process(struct conn_stream *cs)
struct channel *oc = si_oc(si);
/* If we have data to send, try it now */
if (!channel_is_empty(oc) && !(si->wait_event.wait_reason & SUB_CAN_SEND))
if (!channel_is_empty(oc) && !(si->wait_event.events & SUB_RETRY_SEND))
si_cs_send(cs);
/* First step, report to the stream-int what was detected at the
@ -596,7 +596,7 @@ int si_cs_send(struct conn_stream *cs)
int did_send = 0;
/* We're already waiting to be able to send, give up */
if (si->wait_event.wait_reason & SUB_CAN_SEND)
if (si->wait_event.events & SUB_RETRY_SEND)
return 0;
if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR)
@ -680,7 +680,7 @@ int si_cs_send(struct conn_stream *cs)
end:
/* We couldn't send all of our data, let the mux know we'd like to send more */
if (!channel_is_empty(oc))
conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_event);
conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->wait_event);
return did_send;
}
@ -698,9 +698,9 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
if (!cs)
return NULL;
if (!(si->wait_event.wait_reason & SUB_CAN_SEND) && !channel_is_empty(si_oc(si)))
if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
ret = si_cs_send(cs);
if (!(si->wait_event.wait_reason & SUB_CAN_RECV))
if (!(si->wait_event.events & SUB_RETRY_RECV))
ret |= si_cs_recv(cs);
if (ret != 0)
si_cs_process(cs);
@ -1004,7 +1004,7 @@ static void stream_int_chk_snd_conn(struct stream_interface *si)
!(si->flags & SI_FL_WAIT_DATA)) /* not waiting for data */
return;
if (!(si->wait_event.wait_reason & SUB_CAN_SEND) && !channel_is_empty(si_oc(si)))
if (!(si->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(si_oc(si)))
si_cs_send(cs);
if (cs->flags & CS_FL_ERROR || cs->conn->flags & CO_FL_ERROR) {
@ -1093,7 +1093,7 @@ int si_cs_recv(struct conn_stream *cs)
/* If another call to si_cs_recv() failed, and we subscribed to
* recv events already, give up now.
*/
if (si->wait_event.wait_reason & SUB_CAN_RECV)
if (si->wait_event.events & SUB_RETRY_RECV)
return 0;
/* maybe we were called immediately after an asynchronous shutr */
@ -1335,7 +1335,7 @@ int si_cs_recv(struct conn_stream *cs)
/* Subscribe to receive events if we're blocking on I/O */
if (!si_rx_blocked(si)) {
conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_event);
conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->wait_event);
si_rx_endp_done(si);
} else {
si_rx_endp_more(si);