MINOR: conn-stream: Move si_conn_cb in the conn-stream scope

si_conn_cb variable is renamed cs_data_conn_cb. In addtion, its associated
functions are also renamed. si_cs_recv(), si_cs_send() and si_cs_process() are
renamed cs_conn_recv(), cs_conn_send and cs_conn_process(). These functions are
updated to manipulate conn-streams instead of stream-interfaces.
This commit is contained in:
Christopher Faulet 2022-04-01 17:06:32 +02:00
parent 431ce2e3c1
commit 000ba3e613
3 changed files with 64 additions and 66 deletions

View File

@ -29,7 +29,7 @@
#include <haproxy/conn_stream.h> #include <haproxy/conn_stream.h>
#include <haproxy/obj_type.h> #include <haproxy/obj_type.h>
extern struct data_cb si_conn_cb; extern struct data_cb cs_data_conn_cb;
extern struct data_cb cs_data_applet_cb; extern struct data_cb cs_data_applet_cb;
extern struct data_cb check_conn_cb; extern struct data_cb check_conn_cb;
@ -45,9 +45,9 @@ void cs_conn_sync_send(struct conn_stream *cs);
/* Functions used to communicate with a conn_stream. The first two may be used /* Functions used to communicate with a conn_stream. The first two may be used
* directly, the last one is mostly a wake callback. * directly, the last one is mostly a wake callback.
*/ */
int si_cs_recv(struct conn_stream *cs); int cs_conn_recv(struct conn_stream *cs);
int si_cs_send(struct conn_stream *cs); int cs_conn_send(struct conn_stream *cs);
int si_cs_process(struct conn_stream *cs); int cs_conn_process(struct conn_stream *cs);
/* returns the channel which receives data from this stream interface (input channel) */ /* returns the channel which receives data from this stream interface (input channel) */
static inline struct channel *si_ic(struct stream_interface *si) static inline struct channel *si_ic(struct stream_interface *si)

View File

@ -233,7 +233,7 @@ int cs_attach_mux(struct conn_stream *cs, void *target, void *ctx)
} }
cs->ops = &cs_app_conn_ops; cs->ops = &cs_app_conn_ops;
cs->data_cb = &si_conn_cb; cs->data_cb = &cs_data_conn_cb;
} }
else if (cs_check(cs)) else if (cs_check(cs))
cs->data_cb = &check_conn_cb; cs->data_cb = &check_conn_cb;
@ -278,7 +278,7 @@ int cs_attach_strm(struct conn_stream *cs, struct stream *strm)
cs->wait_event.events = 0; cs->wait_event.events = 0;
cs->ops = &cs_app_conn_ops; cs->ops = &cs_app_conn_ops;
cs->data_cb = &si_conn_cb; cs->data_cb = &cs_data_conn_cb;
} }
else if (cs->endp->flags & CS_EP_T_APPLET) { else if (cs->endp->flags & CS_EP_T_APPLET) {
cs->ops = &cs_app_applet_ops; cs->ops = &cs_app_applet_ops;
@ -728,7 +728,7 @@ static void cs_app_chk_snd_conn(struct conn_stream *cs)
return; return;
if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs))) if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
si_cs_send(cs); cs_conn_send(cs);
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) { if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
/* Write error on the file descriptor */ /* Write error on the file descriptor */

View File

@ -47,17 +47,18 @@ static void cs_conn_read0(struct conn_stream *cs);
/* post-IO notification callback */ /* post-IO notification callback */
static void cs_notify(struct conn_stream *cs); static void cs_notify(struct conn_stream *cs);
struct data_cb si_conn_cb = {
.wake = si_cs_process, struct data_cb cs_data_conn_cb = {
.wake = cs_conn_process,
.name = "STRM", .name = "STRM",
}; };
struct data_cb cs_data_applet_cb = { struct data_cb cs_data_applet_cb = {
.wake = cs_applet_process, .wake = cs_applet_process,
.name = "STRM", .name = "STRM",
}; };
struct stream_interface *si_new(struct conn_stream *cs) struct stream_interface *si_new(struct conn_stream *cs)
{ {
struct stream_interface *si; struct stream_interface *si;
@ -222,18 +223,17 @@ static void cs_notify(struct conn_stream *cs)
* connection's polling based on the channels and stream interface's final * connection's polling based on the channels and stream interface's final
* states. The function always returns 0. * states. The function always returns 0.
*/ */
int si_cs_process(struct conn_stream *cs) int cs_conn_process(struct conn_stream *cs)
{ {
struct connection *conn = __cs_conn(cs); struct connection *conn = __cs_conn(cs);
struct stream_interface *si = cs_si(cs); struct channel *ic = cs_ic(cs);
struct channel *ic = si_ic(si); struct channel *oc = cs_oc(cs);
struct channel *oc = si_oc(si);
BUG_ON(!conn); BUG_ON(!conn);
/* If we have data to send, try it now */ /* If we have data to send, try it now */
if (!channel_is_empty(oc) && !(si->cs->wait_event.events & SUB_RETRY_SEND)) if (!channel_is_empty(oc) && !(cs->wait_event.events & SUB_RETRY_SEND))
si_cs_send(cs); cs_conn_send(cs);
/* First step, report to the conn-stream what was detected at the /* First step, report to the conn-stream what was detected at the
* connection layer : errors and connection establishment. * connection layer : errors and connection establishment.
@ -243,13 +243,13 @@ int si_cs_process(struct conn_stream *cs)
* to retry to connect, the connection may still have CO_FL_ERROR, * to retry to connect, the connection may still have CO_FL_ERROR,
* and we don't want to add CS_EP_ERROR back * and we don't want to add CS_EP_ERROR back
* *
* Note: This test is only required because si_cs_process is also the SI * Note: This test is only required because cs_conn_process is also the SI
* wake callback. Otherwise si_cs_recv()/si_cs_send() already take * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
* care of it. * care of it.
*/ */
if (si->cs->state >= CS_ST_CON) { if (cs->state >= CS_ST_CON) {
if (si_is_conn_error(si)) if (si_is_conn_error(cs->si))
cs->endp->flags |= CS_EP_ERROR; cs->endp->flags |= CS_EP_ERROR;
} }
@ -261,22 +261,22 @@ int si_cs_process(struct conn_stream *cs)
if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) && if (!(conn->flags & (CO_FL_WAIT_XPRT | CO_FL_EARLY_SSL_HS)) &&
(cs->endp->flags & CS_EP_WAIT_FOR_HS)) { (cs->endp->flags & CS_EP_WAIT_FOR_HS)) {
cs->endp->flags &= ~CS_EP_WAIT_FOR_HS; cs->endp->flags &= ~CS_EP_WAIT_FOR_HS;
task_wakeup(si_task(si), TASK_WOKEN_MSG); task_wakeup(cs_strm_task(cs), TASK_WOKEN_MSG);
} }
if (!cs_state_in(si->cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) && if (!cs_state_in(cs->state, CS_SB_EST|CS_SB_DIS|CS_SB_CLO) &&
(conn->flags & CO_FL_WAIT_XPRT) == 0) { (conn->flags & CO_FL_WAIT_XPRT) == 0) {
__cs_strm(cs)->conn_exp = TICK_ETERNITY; __cs_strm(cs)->conn_exp = TICK_ETERNITY;
oc->flags |= CF_WRITE_NULL; oc->flags |= CF_WRITE_NULL;
if (si->cs->state == CS_ST_CON) if (cs->state == CS_ST_CON)
si->cs->state = CS_ST_RDY; cs->state = CS_ST_RDY;
} }
/* Report EOS on the channel if it was reached from the mux point of /* Report EOS on the channel if it was reached from the mux point of
* view. * view.
* *
* Note: This test is only required because si_cs_process is also the SI * Note: This test is only required because cs_conn_process is also the SI
* wake callback. Otherwise si_cs_recv()/si_cs_send() already take * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
* care of it. * care of it.
*/ */
if (cs->endp->flags & CS_EP_EOS && !(ic->flags & CF_SHUTR)) { if (cs->endp->flags & CS_EP_EOS && !(ic->flags & CF_SHUTR)) {
@ -290,8 +290,8 @@ int si_cs_process(struct conn_stream *cs)
/* Report EOI on the channel if it was reached from the mux point of /* Report EOI on the channel if it was reached from the mux point of
* view. * view.
* *
* Note: This test is only required because si_cs_process is also the SI * Note: This test is only required because cs_conn_process is also the SI
* wake callback. Otherwise si_cs_recv()/si_cs_send() already take * wake callback. Otherwise cs_conn_recv()/cs_conn_send() already take
* care of it. * care of it.
*/ */
if ((cs->endp->flags & CS_EP_EOI) && !(ic->flags & CF_EOI)) if ((cs->endp->flags & CS_EP_EOI) && !(ic->flags & CF_EOI))
@ -302,7 +302,7 @@ int si_cs_process(struct conn_stream *cs)
* stream-int status. * stream-int status.
*/ */
cs_notify(cs); cs_notify(cs);
stream_release_buffers(si_strm(si)); stream_release_buffers(__cs_strm(cs));
return 0; return 0;
} }
@ -312,30 +312,29 @@ int si_cs_process(struct conn_stream *cs)
* caller to commit polling changes. The caller should check conn->flags * caller to commit polling changes. The caller should check conn->flags
* for errors. * for errors.
*/ */
int si_cs_send(struct conn_stream *cs) int cs_conn_send(struct conn_stream *cs)
{ {
struct connection *conn = __cs_conn(cs); struct connection *conn = __cs_conn(cs);
struct stream_interface *si = cs_si(cs); struct stream *s = __cs_strm(cs);
struct stream *s = si_strm(si); struct channel *oc = cs_oc(cs);
struct channel *oc = si_oc(si);
int ret; int ret;
int did_send = 0; int did_send = 0;
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(si)) { if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING) || si_is_conn_error(cs->si)) {
/* We're probably there because the tasklet was woken up, /* We're probably there because the tasklet was woken up,
* but process_stream() ran before, detected there were an * but process_stream() ran before, detected there were an
* error and put the si back to CS_ST_TAR. There's still * error and put the si back to CS_ST_TAR. There's still
* CO_FL_ERROR on the connection but we don't want to add * CO_FL_ERROR on the connection but we don't want to add
* CS_EP_ERROR back, so give up * CS_EP_ERROR back, so give up
*/ */
if (si->cs->state < CS_ST_CON) if (cs->state < CS_ST_CON)
return 0; return 0;
cs->endp->flags |= CS_EP_ERROR; cs->endp->flags |= CS_EP_ERROR;
return 1; return 1;
} }
/* We're already waiting to be able to send, give up */ /* We're already waiting to be able to send, give up */
if (si->cs->wait_event.events & SUB_RETRY_SEND) if (cs->wait_event.events & SUB_RETRY_SEND)
return 0; return 0;
/* we might have been called just after an asynchronous shutw */ /* we might have been called just after an asynchronous shutw */
@ -383,7 +382,7 @@ int si_cs_send(struct conn_stream *cs)
if ((!(oc->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) && if ((!(oc->flags & (CF_NEVER_WAIT|CF_SEND_DONTWAIT)) &&
((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) || ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
(oc->flags & CF_EXPECT_MORE) || (oc->flags & CF_EXPECT_MORE) ||
(IS_HTX_STRM(si_strm(si)) && (IS_HTX_STRM(s) &&
(!(oc->flags & (CF_EOI|CF_SHUTR)) && htx_expect_more(htxbuf(&oc->buf)))))) || (!(oc->flags & (CF_EOI|CF_SHUTR)) && htx_expect_more(htxbuf(&oc->buf)))))) ||
((oc->flags & CF_ISRESP) && ((oc->flags & CF_ISRESP) &&
((oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW)))) ((oc->flags & (CF_AUTO_CLOSE|CF_SHUTW_NOW)) == (CF_AUTO_CLOSE|CF_SHUTW_NOW))))
@ -437,10 +436,10 @@ int si_cs_send(struct conn_stream *cs)
end: end:
if (did_send) { if (did_send) {
oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA; oc->flags |= CF_WRITE_PARTIAL | CF_WROTE_DATA;
if (si->cs->state == CS_ST_CON) if (cs->state == CS_ST_CON)
si->cs->state = CS_ST_RDY; cs->state = CS_ST_RDY;
si_rx_room_rdy(si_opposite(si)); si_rx_room_rdy(cs_opposite(cs)->si);
} }
if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING)) { if (cs->endp->flags & (CS_EP_ERROR|CS_EP_ERR_PENDING)) {
@ -450,7 +449,7 @@ int si_cs_send(struct conn_stream *cs)
/* We couldn't send all of our data, let the mux know we'd like to send more */ /* We couldn't send all of our data, let the mux know we'd like to send more */
if (!channel_is_empty(oc)) if (!channel_is_empty(oc))
conn->mux->subscribe(cs, SUB_RETRY_SEND, &si->cs->wait_event); conn->mux->subscribe(cs, SUB_RETRY_SEND, &cs->wait_event);
return did_send; return did_send;
} }
@ -468,11 +467,11 @@ struct task *cs_conn_io_cb(struct task *t, void *ctx, unsigned int state)
return t; return t;
if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs))) if (!(cs->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(cs_oc(cs)))
ret = si_cs_send(cs); ret = cs_conn_send(cs);
if (!(cs->wait_event.events & SUB_RETRY_RECV)) if (!(cs->wait_event.events & SUB_RETRY_RECV))
ret |= si_cs_recv(cs); ret |= cs_conn_recv(cs);
if (ret != 0) if (ret != 0)
si_cs_process(cs); cs_conn_process(cs);
stream_release_buffers(__cs_strm(cs)); stream_release_buffers(__cs_strm(cs));
return t; return t;
@ -499,7 +498,7 @@ int cs_conn_sync_recv(struct conn_stream *cs)
if (!si_rx_endp_ready(cs->si) || si_rx_blocked(cs->si)) if (!si_rx_endp_ready(cs->si) || si_rx_blocked(cs->si))
return 0; // already failed return 0; // already failed
return si_cs_recv(cs); return cs_conn_recv(cs);
} }
/* perform a synchronous send() for the stream interface. The CF_WRITE_NULL and /* perform a synchronous send() for the stream interface. The CF_WRITE_NULL and
@ -524,7 +523,7 @@ void cs_conn_sync_send(struct conn_stream *cs)
if (!cs_conn_mux(cs)) if (!cs_conn_mux(cs))
return; return;
si_cs_send(cs); cs_conn_send(cs);
} }
/* /*
@ -532,11 +531,10 @@ void cs_conn_sync_send(struct conn_stream *cs)
* into the buffer from the connection. It iterates over the mux layer's * into the buffer from the connection. It iterates over the mux layer's
* rcv_buf function. * rcv_buf function.
*/ */
int si_cs_recv(struct conn_stream *cs) int cs_conn_recv(struct conn_stream *cs)
{ {
struct connection *conn = __cs_conn(cs); struct connection *conn = __cs_conn(cs);
struct stream_interface *si = cs_si(cs); struct channel *ic = cs_ic(cs);
struct channel *ic = si_ic(si);
int ret, max, cur_read = 0; int ret, max, cur_read = 0;
int read_poll = MAX_READ_POLL_LOOPS; int read_poll = MAX_READ_POLL_LOOPS;
int flags = 0; int flags = 0;
@ -545,10 +543,10 @@ int si_cs_recv(struct conn_stream *cs)
if (cs->state != CS_ST_EST) if (cs->state != CS_ST_EST)
return 0; return 0;
/* If another call to si_cs_recv() failed, and we subscribed to /* If another call to cs_conn_recv() failed, and we subscribed to
* recv events already, give up now. * recv events already, give up now.
*/ */
if (si->cs->wait_event.events & SUB_RETRY_RECV) if (cs->wait_event.events & SUB_RETRY_RECV)
return 0; return 0;
/* maybe we were called immediately after an asynchronous shutr */ /* maybe we were called immediately after an asynchronous shutr */
@ -636,7 +634,7 @@ int si_cs_recv(struct conn_stream *cs)
/* the pipe is full or we have read enough data that it /* the pipe is full or we have read enough data that it
* could soon be full. Let's stop before needing to poll. * could soon be full. Let's stop before needing to poll.
*/ */
si_rx_room_blk(si); si_rx_room_blk(cs->si);
goto done_recv; goto done_recv;
} }
@ -657,7 +655,7 @@ int si_cs_recv(struct conn_stream *cs)
} }
/* now we'll need a input buffer for the stream */ /* now we'll need a input buffer for the stream */
if (!si_alloc_ibuf(si, &(si_strm(si)->buffer_wait))) if (!si_alloc_ibuf(cs->si, &(__cs_strm(cs)->buffer_wait)))
goto end_recv; goto end_recv;
/* For an HTX stream, if the buffer is stuck (no output data with some /* For an HTX stream, if the buffer is stuck (no output data with some
@ -669,15 +667,15 @@ int si_cs_recv(struct conn_stream *cs)
* NOTE: A possible optim may be to let the mux decides if defrag is * NOTE: A possible optim may be to let the mux decides if defrag is
* required or not, depending on amount of data to be xferred. * required or not, depending on amount of data to be xferred.
*/ */
if (IS_HTX_STRM(si_strm(si)) && !co_data(ic)) { if (IS_HTX_STRM(__cs_strm(cs)) && !co_data(ic)) {
struct htx *htx = htxbuf(&ic->buf); struct htx *htx = htxbuf(&ic->buf);
if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx))) if (htx_is_not_empty(htx) && ((htx->flags & HTX_FL_FRAGMENTED) || htx_space_wraps(htx)))
htx_defrag(htxbuf(&ic->buf), NULL, 0); htx_defrag(htx, NULL, 0);
} }
/* Instruct the mux it must subscribed for read events */ /* Instruct the mux it must subscribed for read events */
flags |= ((!conn_is_back(conn) && (si_strm(si)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0); flags |= ((!conn_is_back(conn) && (__cs_strm(cs)->be->options & PR_O_ABRT_CLOSE)) ? CO_RFL_KEEP_RECV : 0);
/* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling /* Important note : if we're called with POLL_IN|POLL_HUP, it means the read polling
* was enabled, which implies that the recv buffer was not full. So we have a guarantee * was enabled, which implies that the recv buffer was not full. So we have a guarantee
@ -706,7 +704,7 @@ int si_cs_recv(struct conn_stream *cs)
*/ */
BUG_ON(c_empty(ic)); BUG_ON(c_empty(ic));
si_rx_room_blk(si); si_rx_room_blk(cs->si);
/* Add READ_PARTIAL because some data are pending but /* Add READ_PARTIAL because some data are pending but
* cannot be xferred to the channel * cannot be xferred to the channel
*/ */
@ -720,7 +718,7 @@ int si_cs_recv(struct conn_stream *cs)
* here to proceed. * here to proceed.
*/ */
if (flags & CO_RFL_BUF_FLUSH) if (flags & CO_RFL_BUF_FLUSH)
si_rx_room_blk(si); si_rx_room_blk(cs->si);
break; break;
} }
@ -750,7 +748,7 @@ int si_cs_recv(struct conn_stream *cs)
if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) { if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
/* we're stopped by the channel's policy */ /* we're stopped by the channel's policy */
si_rx_chan_blk(si); si_rx_chan_blk(cs->si);
break; break;
} }
@ -765,7 +763,7 @@ int si_cs_recv(struct conn_stream *cs)
*/ */
if (ic->flags & CF_STREAMER) { if (ic->flags & CF_STREAMER) {
/* we're stopped by the channel's policy */ /* we're stopped by the channel's policy */
si_rx_chan_blk(si); si_rx_chan_blk(cs->si);
break; break;
} }
@ -774,7 +772,7 @@ int si_cs_recv(struct conn_stream *cs)
*/ */
if (ret >= global.tune.recv_enough) { if (ret >= global.tune.recv_enough) {
/* we're stopped by the channel's policy */ /* we're stopped by the channel's policy */
si_rx_chan_blk(si); si_rx_chan_blk(cs->si);
break; break;
} }
} }
@ -782,7 +780,7 @@ int si_cs_recv(struct conn_stream *cs)
/* if we are waiting for more space, don't try to read more data /* if we are waiting for more space, don't try to read more data
* right now. * right now.
*/ */
if (si_rx_blocked(si)) if (si_rx_blocked(cs->si))
break; break;
} /* while !flags */ } /* while !flags */
@ -846,12 +844,12 @@ int si_cs_recv(struct conn_stream *cs)
cs_conn_read0(cs); cs_conn_read0(cs);
ret = 1; ret = 1;
} }
else if (!si_rx_blocked(si)) { else if (!si_rx_blocked(cs->si)) {
/* Subscribe to receive events if we're blocking on I/O */ /* Subscribe to receive events if we're blocking on I/O */
conn->mux->subscribe(cs, SUB_RETRY_RECV, &si->cs->wait_event); conn->mux->subscribe(cs, SUB_RETRY_RECV, &cs->wait_event);
si_rx_endp_done(si); si_rx_endp_done(cs->si);
} else { } else {
si_rx_endp_more(si); si_rx_endp_more(cs->si);
ret = 1; ret = 1;
} }
return ret; return ret;