MINOR: connections: Introduce an unsubscribe method.

As we don't know how subscriptions are handled, we can't just assume we can
use LIST_DEL() to unsubscribe, so introduce a new method to mux and connections
to do so.
This commit is contained in:
Olivier Houchard 2018-09-28 17:57:58 +02:00 committed by Willy Tarreau
parent 5ab01cb011
commit 83a0cd8a36
7 changed files with 79 additions and 17 deletions

View File

@ -51,6 +51,8 @@ int make_proxy_line_v1(char *buf, int buf_len, struct sockaddr_storage *src, str
int make_proxy_line_v2(char *buf, int buf_len, struct server *srv, struct connection *remote);
int conn_subscribe(struct connection *conn, int event_type, void *param);
int conn_unsubscribe(struct connection *conn, int event_type, void *param);
/* receive a NetScaler Client IP insertion header over a connection */
int conn_recv_netscaler_cip(struct connection *conn, int flag);

View File

@ -301,6 +301,7 @@ struct xprt_ops {
int (*get_alpn)(const struct connection *conn, const char **str, int *len); /* get application layer name */
char name[8]; /* transport layer name, zero-terminated */
int (*subscribe)(struct connection *conn, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
int (*unsubscribe)(struct connection *conn, int event_type, void *param); /* Unsubscribe to events */
};
/* mux_ops describes the mux operations, which are to be performed at the
@ -325,6 +326,7 @@ struct mux_ops {
void (*detach)(struct conn_stream *); /* Detach a conn_stream from an outgoing connection, when the request is done */
void (*show_fd)(struct buffer *, struct connection *); /* append some data about connection into chunk for "show fd" */
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
int (*unsubscribe)(struct conn_stream *cs, int event_type, void *param); /* Unsubscribe to events */
unsigned int flags; /* some flags characterizing the mux's capabilities (MX_FL_*) */
char name[8]; /* mux layer name, zero-terminated */
};
@ -338,7 +340,6 @@ struct mux_ops {
*/
struct data_cb {
int (*wake)(struct conn_stream *cs); /* data-layer callback to report activity */
int (*subscribe)(struct conn_stream *cs, int event_type, void *param); /* Subscribe to events, such as "being able to send" */
char name[8]; /* data layer name, zero-terminated */
};

View File

@ -358,12 +358,38 @@ int conn_sock_send(struct connection *conn, const void *buf, int len, int flags)
return ret;
}
int conn_unsubscribe(struct connection *conn, int event_type, void *param)
{
struct wait_list *sw;
if (event_type & SUB_CAN_RECV) {
sw = param;
if (sw->wait_reason & SUB_CAN_RECV) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_RECV;
if (sw->wait_reason & SUB_CAN_SEND)
LIST_ADDQ(&conn->send_wait_list, &sw->list);
}
}
if (event_type & SUB_CAN_SEND) {
sw = param;
if (sw->wait_reason & SUB_CAN_SEND) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
sw->wait_reason &= ~SUB_CAN_SEND;
if (sw->wait_reason & SUB_CAN_RECV)
LIST_ADDQ(&conn->recv_wait_list, &sw->list);
}
}
return 0;
}
int conn_subscribe(struct connection *conn, int event_type, void *param)
{
struct wait_list *sw;
switch (event_type) {
case SUB_CAN_RECV:
if (event_type & SUB_CAN_RECV) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
@ -377,8 +403,9 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
} else
LIST_ADDQ(&conn->recv_wait_list, &sw->list);
}
return 0;
case SUB_CAN_SEND:
event_type &= ~SUB_CAN_RECV;
}
if (event_type & SUB_CAN_SEND) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
@ -392,11 +419,11 @@ int conn_subscribe(struct connection *conn, int event_type, void *param)
} else
LIST_ADDQ(&conn->send_wait_list, &sw->list);
}
return 0;
default:
break;
event_type &= ~SUB_CAN_SEND;
}
return (-1);
if (event_type != 0)
return (-1);
return 0;
}
/* Drains possibly pending incoming data on the file descriptor attached to the

View File

@ -3541,16 +3541,16 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
struct h2s *h2s = cs->ctx;
struct h2c *h2c = h2s->h2c;
switch (event_type) {
case SUB_CAN_RECV:
if (event_type & SUB_CAN_RECV) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_RECV)) {
sw->wait_reason |= SUB_CAN_RECV;
sw->handle = h2s;
h2s->recv_wait_list = sw;
}
return 0;
case SUB_CAN_SEND:
event_type &= ~SUB_CAN_RECV;
}
if (event_type & SUB_CAN_SEND) {
sw = param;
if (!(sw->wait_reason & SUB_CAN_SEND)) {
sw->wait_reason |= SUB_CAN_SEND;
@ -3560,15 +3560,38 @@ static int h2_subscribe(struct conn_stream *cs, int event_type, void *param)
else
LIST_ADDQ(&h2c->send_list, &sw->list);
}
return 0;
default:
break;
event_type &= ~SUB_CAN_SEND;
}
return -1;
if (event_type != 0)
return -1;
return 0;
}
static int h2_unsubscribe(struct conn_stream *cs, int event_type, void *param)
{
struct wait_list *sw;
struct h2s *h2s = cs->ctx;
if (event_type & SUB_CAN_RECV) {
sw = param;
if (h2s->recv_wait_list == sw) {
sw->wait_reason &= ~SUB_CAN_RECV;
h2s->recv_wait_list = NULL;
}
}
if (event_type & SUB_CAN_SEND) {
sw = param;
if (sw->wait_reason & SUB_CAN_SEND) {
LIST_DEL(&sw->list);
LIST_INIT(&sw->list);
}
}
return 0;
}
/* Called from the upper layer, to receive data */
static size_t h2_rcv_buf(struct conn_stream *cs, struct buffer *buf, size_t count, int flags)
{
@ -3767,6 +3790,7 @@ const struct mux_ops h2_ops = {
.snd_buf = h2_snd_buf,
.rcv_buf = h2_rcv_buf,
.subscribe = h2_subscribe,
.unsubscribe = h2_unsubscribe,
.attach = h2_attach,
.detach = h2_detach,
.shutr = h2_shutr,

View File

@ -162,6 +162,11 @@ static int mux_pt_subscribe(struct conn_stream *cs, int event_type, void *param)
return (cs->conn->xprt->subscribe(cs->conn, event_type, param));
}
static int mux_pt_unsubscribe(struct conn_stream *cs, int event_type, void *param)
{
return (cs->conn->xprt->unsubscribe(cs->conn, event_type, param));
}
#if defined(CONFIG_HAP_LINUX_SPLICE)
/* Send and get, using splicing */
static int mux_pt_rcv_pipe(struct conn_stream *cs, struct pipe *pipe, unsigned int count)
@ -190,6 +195,7 @@ const struct mux_ops mux_pt_ops = {
.rcv_buf = mux_pt_rcv_buf,
.snd_buf = mux_pt_snd_buf,
.subscribe = mux_pt_subscribe,
.unsubscribe = mux_pt_unsubscribe,
#if defined(CONFIG_HAP_LINUX_SPLICE)
.rcv_pipe = mux_pt_rcv_pipe,
.snd_pipe = mux_pt_snd_pipe,

View File

@ -425,6 +425,7 @@ static struct xprt_ops raw_sock = {
.snd_buf = raw_sock_from_buf,
.rcv_buf = raw_sock_to_buf,
.subscribe = conn_subscribe,
.unsubscribe = conn_unsubscribe,
#if defined(CONFIG_HAP_LINUX_SPLICE)
.rcv_pipe = raw_sock_to_pipe,
.snd_pipe = raw_sock_from_pipe,

View File

@ -9023,6 +9023,7 @@ static struct xprt_ops ssl_sock = {
.snd_buf = ssl_sock_from_buf,
.rcv_buf = ssl_sock_to_buf,
.subscribe = conn_subscribe,
.unsubscribe = conn_unsubscribe,
.rcv_pipe = NULL,
.snd_pipe = NULL,
.shutr = NULL,