diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index 15b276ca1..fe55288b6 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -285,6 +285,22 @@ static inline void si_done_put(struct stream_interface *si) si->flags |= SI_FL_RX_WAIT_EP; } +/* The stream interface just got the input buffer it was waiting for */ +static inline void si_rx_buff_rdy(struct stream_interface *si) +{ + si->flags &= ~SI_FL_RXBLK_BUFF; +} + +/* The stream interface failed to get an input buffer and is waiting for it. + * Since it indicates a willingness to deliver data to the buffer that will + * have to be retried, we automatically clear RXBLK_ENDP to be called again + * as soon as RXBLK_BUFF is cleared. + */ +static inline void si_rx_buff_blk(struct stream_interface *si) +{ + si->flags |= SI_FL_RXBLK_BUFF; +} + /* Returns non-zero if the stream interface's Rx path is blocked */ static inline int si_tx_blocked(const struct stream_interface *si) { @@ -342,10 +358,10 @@ static inline struct conn_stream *si_alloc_cs(struct stream_interface *si, struc /* Try to allocate a buffer for the stream-int's input channel. It relies on * channel_alloc_buffer() for this so it abides by its rules. It returns 0 on * failure, non-zero otherwise. If no buffer is available, the requester, - * represented by pointer, will be added in the list of objects waiting - * for an available buffer, and SI_FL_RXBLK_ROOM will be set on the stream-int. - * The requester will be responsible for calling this function to try again - * once woken up. + * represented by the pointer, will be added in the list of objects + * waiting for an available buffer, and SI_FL_RXBLK_BUFF will be set on the + * stream-int and SI_FL_RX_WAIT_EP cleared. The requester will be responsible + * for calling this function to try again once woken up. */ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait *wait) { @@ -353,7 +369,7 @@ static inline int si_alloc_ibuf(struct stream_interface *si, struct buffer_wait ret = channel_alloc_buffer(si_ic(si), wait); if (!ret) - si_cant_put(si); + si_rx_buff_blk(si); return ret; } diff --git a/src/applet.c b/src/applet.c index 15cc5ce43..1fe453107 100644 --- a/src/applet.c +++ b/src/applet.c @@ -36,14 +36,21 @@ int appctx_buf_available(void *arg) struct stream_interface *si = appctx->owner; /* allocation requested ? */ - if (!(si->flags & SI_FL_RXBLK_ROOM) || c_size(si_ic(si)) || si_ic(si)->pipe) + if (!(si->flags & SI_FL_RXBLK_BUFF)) + return 0; + + si_rx_buff_rdy(si); + + /* was already allocated another way ? if so, don't take this one */ + if (c_size(si_ic(si)) || si_ic(si)->pipe) return 0; /* allocation possible now ? */ - if (!b_alloc_margin(&si_ic(si)->buf, global.tune.reserved_bufs)) + if (!b_alloc_margin(&si_ic(si)->buf, global.tune.reserved_bufs)) { + si_rx_buff_blk(si); return 0; + } - si->flags &= ~SI_FL_RXBLK_ROOM; task_wakeup(appctx->t, TASK_WOKEN_RES); return 1; } @@ -58,12 +65,6 @@ struct task *task_run_applet(struct task *t, void *context, unsigned short state __appctx_free(app); return NULL; } - /* Now we'll try to allocate the input buffer. We wake up the - * applet in all cases. So this is the applet responsibility to - * check if this buffer was allocated or not. This let a chance - * for applets to do some other processing if needed. */ - if (!si_alloc_ibuf(si, &app->buffer_wait)) - si_cant_put(si); /* We always pretend the applet can't get and doesn't want to * put, it's up to it to change this if needed. This ensures @@ -72,6 +73,15 @@ struct task *task_run_applet(struct task *t, void *context, unsigned short state si_cant_get(si); si_stop_put(si); + /* Now we'll try to allocate the input buffer. We wake up the applet in + * all cases. So this is the applet's responsibility to check if this + * buffer was allocated or not. This leaves a chance for applets to do + * some other processing if needed. The applet doesn't have anything to + * do if it needs the buffer, it will be called again upon readiness. + */ + if (!si_alloc_ibuf(si, &app->buffer_wait)) + si_want_put(si); + app->applet->fct(app); si_applet_wake_cb(si); channel_release_buffer(si_ic(si), &app->buffer_wait); diff --git a/src/stream.c b/src/stream.c index b7e1a0417..dec9e84b5 100644 --- a/src/stream.c +++ b/src/stream.c @@ -96,12 +96,12 @@ int stream_buf_available(void *arg) { struct stream *s = arg; - if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_RXBLK_ROOM) && + if (!s->req.buf.size && !s->req.pipe && (s->si[0].flags & SI_FL_RXBLK_BUFF) && b_alloc_margin(&s->req.buf, global.tune.reserved_bufs)) - s->si[0].flags &= ~SI_FL_RXBLK_ROOM; - else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_RXBLK_ROOM) && + si_rx_buff_rdy(&s->si[0]); + else if (!s->res.buf.size && !s->res.pipe && (s->si[1].flags & SI_FL_RXBLK_BUFF) && b_alloc_margin(&s->res.buf, 0)) - s->si[1].flags &= ~SI_FL_RXBLK_ROOM; + si_rx_buff_rdy(&s->si[1]); else return 0;