diff --git a/src/mux_h2.c b/src/mux_h2.c index 64cd59682..a0fadcd53 100644 --- a/src/mux_h2.c +++ b/src/mux_h2.c @@ -488,6 +488,8 @@ static void h2_release(struct connection *conn) } if (h2c->wait_list.task) tasklet_free(h2c->wait_list.task); + LIST_DEL(&h2c->wait_list.list); + LIST_INIT(&h2c->wait_list.list); pool_free(pool_head_h2c, h2c); } @@ -652,6 +654,8 @@ static void h2s_destroy(struct h2s *h2s) b_free(&h2s->rxbuf); offer_buffers(NULL, tasks_run_queue); } + LIST_DEL(&h2s->wait_list.list); + LIST_INIT(&h2s->wait_list.list); tasklet_free(h2s->wait_list.task); pool_free(pool_head_h2s, h2s); } @@ -1112,7 +1116,12 @@ static void h2_wake_some_streams(struct h2c *h2c, int last, uint32_t flags) } h2s->cs->flags |= flags; - h2s->cs->data_cb->wake(h2s->cs); + if (h2s->recv_wait_list) { + struct wait_list *sw = h2s->recv_wait_list; + sw->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(sw->task); + h2s->recv_wait_list = NULL; + } if (flags & CS_FL_ERROR && h2s->st < H2_SS_ERROR) h2s->st = H2_SS_ERROR; @@ -1584,7 +1593,13 @@ static int h2c_handle_rst_stream(struct h2c *h2c, struct h2s *h2s) if (h2s->cs) { h2s->cs->flags |= CS_FL_REOS | CS_FL_ERROR; - h2s->cs->data_cb->wake(h2s->cs); + if (h2s->recv_wait_list) { + struct wait_list *sw = h2s->recv_wait_list; + + sw->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(sw->task); + h2s->recv_wait_list = NULL; + } } h2s->flags |= H2_SF_RST_RCVD; @@ -1869,12 +1884,11 @@ static void h2_process_demux(struct h2c *h2c) if (tmp_h2s != h2s && h2s && h2s->cs && b_data(&h2s->rxbuf)) { /* we may have to signal the upper layers */ h2s->cs->flags |= CS_FL_RCV_MORE; - if (h2s->cs->data_cb->wake(h2s->cs) < 0) { - /* cs has just been destroyed, we have to kill h2s. */ - h2s_error(h2s, H2_ERR_STREAM_CLOSED); - goto strm_err; + if (h2s->recv_wait_list) { + h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(h2s->recv_wait_list->task); + h2s->recv_wait_list = NULL; } - if (h2c->st0 >= H2_CS_ERROR) goto strm_err; @@ -2114,10 +2128,10 @@ static void h2_process_demux(struct h2c *h2c) if (h2s && h2s->cs && b_data(&h2s->rxbuf)) { /* we may have to signal the upper layers */ h2s->cs->flags |= CS_FL_RCV_MORE; - if (h2s->cs->data_cb->wake(h2s->cs) < 0) { - /* cs has just been destroyed, we have to kill h2s. */ - h2s_error(h2s, H2_ERR_STREAM_CLOSED); - h2c_send_rst_stream(h2c, h2s); + if (h2s->recv_wait_list) { + h2s->recv_wait_list->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(h2s->recv_wait_list->task); + h2s->recv_wait_list = NULL; } } return; @@ -2393,8 +2407,13 @@ static int h2_process(struct h2c *h2c) while (node) { h2s = container_of(node, struct h2s, by_id); - if (h2s->cs->flags & CS_FL_WAIT_FOR_HS) - h2s->cs->data_cb->wake(h2s->cs); + if ((h2s->cs->flags & CS_FL_WAIT_FOR_HS) && + h2s->recv_wait_list) { + struct wait_list *sw = h2s->recv_wait_list; + sw->wait_reason &= ~SUB_CAN_RECV; + tasklet_wakeup(sw->task); + h2s->recv_wait_list = NULL; + } node = eb32_next(node); } } diff --git a/src/stream.c b/src/stream.c index 667eb6a4c..cc307453a 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1655,6 +1655,10 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) si_f = &s->si[0]; si_b = &s->si[1]; + /* First, attempd to do I/Os */ + si_cs_io_cb(NULL, si_f, 0); + si_cs_io_cb(NULL, si_b, 0); + //DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__, // si_f->state, si_b->state, si_b->err_type, req->flags, res->flags); @@ -2484,6 +2488,9 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) #endif s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES); stream_release_buffers(s); + /* We may have free'd some space in buffers, or have more to send/recv, try again */ + si_cs_io_cb(NULL, si_f, 0); + si_cs_io_cb(NULL, si_b, 0); return t; /* nothing more to do */ } diff --git a/src/stream_interface.c b/src/stream_interface.c index e5ddee68f..13f7aa309 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -52,7 +52,7 @@ static void stream_int_shutw_applet(struct stream_interface *si); static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si); static int si_cs_recv(struct conn_stream *cs); -static int si_cs_wake_cb(struct conn_stream *cs); +static int si_cs_process(struct conn_stream *cs); static int si_idle_conn_wake_cb(struct conn_stream *cs); static int si_cs_send(struct conn_stream *cs); @@ -83,7 +83,6 @@ struct si_ops si_applet_ops = { }; struct data_cb si_conn_cb = { - .wake = si_cs_wake_cb, .name = "STRM", }; @@ -554,27 +553,19 @@ void stream_int_notify(struct stream_interface *si) } -/* Callback to be used by connection I/O handlers upon completion. It propagates +/* Called by I/O handlers after completion.. It propagates * connection flags to the stream interface, updates the stream (which may or * may not take this opportunity to try to forward data), then update the * connection's polling based on the channels and stream interface's final * states. The function always returns 0. */ -static int si_cs_wake_cb(struct conn_stream *cs) +static int si_cs_process(struct conn_stream *cs) { struct connection *conn = cs->conn; struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); struct channel *oc = si_oc(si); - /* if the CS's input buffer already has data available, let's try to - * receive now. The new muxes do this. The CS_FL_REOS is another cause - * for recv() (received only an empty response). - */ - if (!(cs->flags & CS_FL_EOS) && - (cs->flags & (CS_FL_DATA_RD_ENA))) - si_cs_recv(cs); - /* If we have data to send, try it now */ if (!channel_is_empty(oc) && objt_cs(si->end)) si_cs_send(objt_cs(si->end)); @@ -644,7 +635,7 @@ static int si_cs_send(struct conn_stream *cs) return 0; if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return 0; + return 1; if (conn->flags & CO_FL_HANDSHAKE) { /* a handshake was requested */ @@ -655,7 +646,7 @@ static int si_cs_send(struct conn_stream *cs) /* we might have been called just after an asynchronous shutw */ if (si_oc(si)->flags & CF_SHUTW) - return 0; + return 1; /* ensure it's only set if a write attempt has succeeded */ oc->flags &= ~CF_WRITE_PARTIAL; @@ -728,8 +719,10 @@ static int si_cs_send(struct conn_stream *cs) } } /* We couldn't send all of our data, let the mux know we'd like to send more */ - if (co_data(oc)) + if (co_data(oc)) { + cs_want_send(cs); conn->mux->subscribe(cs, SUB_CAN_SEND, &si->wait_list); + } wake_others: /* Maybe somebody was waiting for this conn_stream, wake them */ @@ -764,12 +757,13 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state) if (!cs) return NULL; +redo: if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) ret = si_cs_send(cs); if (!(si->wait_list.wait_reason & SUB_CAN_RECV)) ret |= si_cs_recv(cs); if (ret != 0) - si_cs_wake_cb(cs); + si_cs_process(cs); return (NULL); } @@ -1015,8 +1009,9 @@ static void stream_int_chk_rcv_conn(struct stream_interface *si) if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { /* stop reading */ - if (!(ic->flags & CF_DONT_READ)) /* full */ + if (!(ic->flags & CF_DONT_READ)) /* full */ { si->flags |= SI_FL_WAIT_ROOM; + } __cs_stop_recv(cs); } else { @@ -1157,7 +1152,7 @@ static int si_cs_recv(struct conn_stream *cs) * which rejects it before reading it all. */ if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return 0; + return 1; // We want to make sure si_cs_wake() is called, so that process_strema is woken up, on failure /* If another call to si_cs_recv() failed, and we subscribed to * recv events already, give up now. @@ -1167,7 +1162,7 @@ static int si_cs_recv(struct conn_stream *cs) /* maybe we were called immediately after an asynchronous shutr */ if (ic->flags & CF_SHUTR) - return 0; + return 1; /* stop here if we reached the end of data */ if (cs->flags & CS_FL_EOS) @@ -1226,7 +1221,7 @@ static int si_cs_recv(struct conn_stream *cs) goto out_shutdown_r; if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return cur_read != 0; + return 1; if (conn->flags & CO_FL_WAIT_ROOM) { /* the pipe is full or we have read enough data that it @@ -1385,7 +1380,7 @@ static int si_cs_recv(struct conn_stream *cs) end_recv: if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return cur_read != 0; + return 1; if (cs->flags & CS_FL_EOS) /* connection closed */ @@ -1402,7 +1397,7 @@ static int si_cs_recv(struct conn_stream *cs) if (ic->flags & CF_AUTO_CLOSE) channel_shutw_now(ic); stream_sock_read0(si); - return cur_read != 0; + return 1; } /*