mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-06 23:27:04 +02:00
[MEDIUM] got rid of event_{cli,srv}_write() in favor of stream_sock_write()
The timeouts, expiration timers and results are now stored in the buffers. The timers will have to change a bit to become more flexible, and when the I/O completion functions will be written, the connect_complete() will have to be extracted from the write() function.
This commit is contained in:
parent
d797128d6e
commit
f8306d5391
@ -29,11 +29,9 @@
|
|||||||
#include <common/config.h>
|
#include <common/config.h>
|
||||||
|
|
||||||
|
|
||||||
|
/* main event functions used to move data between sockets and buffers */
|
||||||
int stream_sock_read(int fd);
|
int stream_sock_read(int fd);
|
||||||
|
int stream_sock_write(int fd);
|
||||||
/* FIXME: merge those ones together */
|
|
||||||
int event_cli_write(int fd);
|
|
||||||
int event_srv_write(int fd);
|
|
||||||
|
|
||||||
|
|
||||||
/* This either returns the sockname or the original destination address. Code
|
/* This either returns the sockname or the original destination address. Code
|
||||||
|
@ -429,7 +429,7 @@ int connect_server(struct session *s)
|
|||||||
fdtab[fd].state = FD_STCONN; /* connection in progress */
|
fdtab[fd].state = FD_STCONN; /* connection in progress */
|
||||||
fdtab[fd].cb[DIR_RD].f = &stream_sock_read;
|
fdtab[fd].cb[DIR_RD].f = &stream_sock_read;
|
||||||
fdtab[fd].cb[DIR_RD].b = s->rep;
|
fdtab[fd].cb[DIR_RD].b = s->rep;
|
||||||
fdtab[fd].cb[DIR_WR].f = &event_srv_write;
|
fdtab[fd].cb[DIR_WR].f = &stream_sock_write;
|
||||||
fdtab[fd].cb[DIR_WR].b = s->req;
|
fdtab[fd].cb[DIR_WR].b = s->req;
|
||||||
|
|
||||||
FD_SET(fd, StaticWriteEvent); /* for connect status */
|
FD_SET(fd, StaticWriteEvent); /* for connect status */
|
||||||
|
@ -340,7 +340,7 @@ int event_accept(int fd) {
|
|||||||
fdtab[cfd].state = FD_STREADY;
|
fdtab[cfd].state = FD_STREADY;
|
||||||
fdtab[cfd].cb[DIR_RD].f = &stream_sock_read;
|
fdtab[cfd].cb[DIR_RD].f = &stream_sock_read;
|
||||||
fdtab[cfd].cb[DIR_RD].b = s->req;
|
fdtab[cfd].cb[DIR_RD].b = s->req;
|
||||||
fdtab[cfd].cb[DIR_WR].f = &event_cli_write;
|
fdtab[cfd].cb[DIR_WR].f = &stream_sock_write;
|
||||||
fdtab[cfd].cb[DIR_WR].b = s->rep;
|
fdtab[cfd].cb[DIR_WR].b = s->rep;
|
||||||
|
|
||||||
if ((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) ||
|
if ((p->mode == PR_MODE_HTTP && (s->flags & SN_MONITOR)) ||
|
||||||
|
6
src/fd.c
6
src/fd.c
@ -15,12 +15,6 @@
|
|||||||
* - we still use 'listeners' to check whether we want to stop or not.
|
* - we still use 'listeners' to check whether we want to stop or not.
|
||||||
* - the various pollers should be moved to other external files, possibly
|
* - the various pollers should be moved to other external files, possibly
|
||||||
* dynamic libs.
|
* dynamic libs.
|
||||||
* - stream_sock_read() : It may be called from event_accept().
|
|
||||||
* - extract the connect code from event_srv_write()
|
|
||||||
* => event_tcp_connect(). It must then call event_write().
|
|
||||||
* - merge the remaining event_cli_write() and event_srv_write()
|
|
||||||
* => single event_tcp_write(). Check buffer, fd_state, res*, and timeouts.
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
@ -144,21 +144,19 @@ int stream_sock_read(int fd) {
|
|||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* this function is called on a write event from a client socket.
|
* this function is called on a write event from a stream socket.
|
||||||
* It returns 0.
|
* It returns 0.
|
||||||
*/
|
*/
|
||||||
int event_cli_write(int fd) {
|
int stream_sock_write(int fd) {
|
||||||
struct task *t = fdtab[fd].owner;
|
|
||||||
struct buffer *b = fdtab[fd].cb[DIR_WR].b;
|
struct buffer *b = fdtab[fd].cb[DIR_WR].b;
|
||||||
int ret, max;
|
int ret, max;
|
||||||
|
|
||||||
#ifdef DEBUG_FULL
|
#ifdef DEBUG_FULL
|
||||||
fprintf(stderr,"event_cli_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
|
fprintf(stderr,"stream_sock_write : fd=%d, owner=%p\n", fd, fdtab[fd].owner);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
|
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
|
||||||
b->r = b->w = b->h = b->lr = b->data;
|
b->r = b->w = b->h = b->lr = b->data;
|
||||||
// max = BUFSIZE; BUG !!!!
|
|
||||||
max = 0;
|
max = 0;
|
||||||
}
|
}
|
||||||
else if (b->r > b->w) {
|
else if (b->r > b->w) {
|
||||||
@ -169,8 +167,24 @@ int event_cli_write(int fd) {
|
|||||||
|
|
||||||
if (fdtab[fd].state != FD_STERROR) {
|
if (fdtab[fd].state != FD_STERROR) {
|
||||||
if (max == 0) {
|
if (max == 0) {
|
||||||
|
/* may be we have received a connection acknowledgement in TCP mode without data */
|
||||||
|
if (fdtab[fd].state == FD_STCONN) {
|
||||||
|
int skerr;
|
||||||
|
socklen_t lskerr = sizeof(skerr);
|
||||||
|
getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
|
||||||
|
if (skerr) {
|
||||||
|
b->flags |= BF_WRITE_ERROR;
|
||||||
|
fdtab[fd].state = FD_STERROR;
|
||||||
|
task_wakeup(&rq, fdtab[fd].owner);
|
||||||
|
tv_eternity(&b->wex);
|
||||||
|
FD_CLR(fd, StaticWriteEvent);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
b->flags |= BF_WRITE_NULL;
|
b->flags |= BF_WRITE_NULL;
|
||||||
task_wakeup(&rq, t);
|
task_wakeup(&rq, fdtab[fd].owner);
|
||||||
|
fdtab[fd].state = FD_STREADY;
|
||||||
tv_eternity(&b->wex);
|
tv_eternity(&b->wex);
|
||||||
FD_CLR(fd, StaticWriteEvent);
|
FD_CLR(fd, StaticWriteEvent);
|
||||||
return 0;
|
return 0;
|
||||||
@ -202,7 +216,7 @@ int event_cli_write(int fd) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (ret == 0) {
|
else if (ret == 0) {
|
||||||
/* nothing written, just make as if we were never called */
|
/* nothing written, just pretend we were never called */
|
||||||
// b->flags |= BF_WRITE_NULL;
|
// b->flags |= BF_WRITE_NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -229,125 +243,12 @@ int event_cli_write(int fd) {
|
|||||||
else
|
else
|
||||||
tv_eternity(&b->wex);
|
tv_eternity(&b->wex);
|
||||||
|
|
||||||
task_wakeup(&rq, t);
|
task_wakeup(&rq, fdtab[fd].owner);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* this function is called on a write event from a server socket.
|
|
||||||
* It returns 0.
|
|
||||||
*/
|
|
||||||
int event_srv_write(int fd) {
|
|
||||||
struct task *t = fdtab[fd].owner;
|
|
||||||
struct buffer *b = fdtab[fd].cb[DIR_WR].b;
|
|
||||||
struct session *s = t->context;
|
|
||||||
int ret, max;
|
|
||||||
|
|
||||||
#ifdef DEBUG_FULL
|
|
||||||
fprintf(stderr,"event_srv_write : fd=%d, s=%p\n", fd, s);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (b->l == 0) { /* let's realign the buffer to optimize I/O */
|
|
||||||
b->r = b->w = b->h = b->lr = b->data;
|
|
||||||
// max = BUFSIZE; BUG !!!!
|
|
||||||
max = 0;
|
|
||||||
}
|
|
||||||
else if (b->r > b->w) {
|
|
||||||
max = b->r - b->w;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
max = b->data + BUFSIZE - b->w;
|
|
||||||
|
|
||||||
if (fdtab[fd].state != FD_STERROR) {
|
|
||||||
if (max == 0) {
|
|
||||||
/* may be we have received a connection acknowledgement in TCP mode without data */
|
|
||||||
if (s->srv_state == SV_STCONN) {
|
|
||||||
int skerr;
|
|
||||||
socklen_t lskerr = sizeof(skerr);
|
|
||||||
getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
|
|
||||||
if (skerr) {
|
|
||||||
b->flags |= BF_WRITE_ERROR;
|
|
||||||
fdtab[fd].state = FD_STERROR;
|
|
||||||
task_wakeup(&rq, t);
|
|
||||||
tv_eternity(&b->wex);
|
|
||||||
FD_CLR(fd, StaticWriteEvent);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
b->flags |= BF_WRITE_NULL;
|
|
||||||
task_wakeup(&rq, t);
|
|
||||||
fdtab[fd].state = FD_STREADY;
|
|
||||||
tv_eternity(&b->wex);
|
|
||||||
FD_CLR(fd, StaticWriteEvent);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifndef MSG_NOSIGNAL
|
|
||||||
{
|
|
||||||
int skerr;
|
|
||||||
socklen_t lskerr = sizeof(skerr);
|
|
||||||
getsockopt(fd, SOL_SOCKET, SO_ERROR, &skerr, &lskerr);
|
|
||||||
if (skerr)
|
|
||||||
ret = -1;
|
|
||||||
else
|
|
||||||
ret = send(fd, b->w, max, MSG_DONTWAIT);
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
ret = send(fd, b->w, max, MSG_DONTWAIT | MSG_NOSIGNAL);
|
|
||||||
#endif
|
|
||||||
fdtab[fd].state = FD_STREADY;
|
|
||||||
if (ret > 0) {
|
|
||||||
b->l -= ret;
|
|
||||||
b->w += ret;
|
|
||||||
|
|
||||||
b->flags |= BF_PARTIAL_WRITE;
|
|
||||||
|
|
||||||
if (b->w == b->data + BUFSIZE) {
|
|
||||||
b->w = b->data; /* wrap around the buffer */
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (ret == 0) {
|
|
||||||
/* nothing written, just make as if we were never called */
|
|
||||||
// b->flags |= BF_WRITE_NULL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
else if (errno == EAGAIN) /* ignore EAGAIN */
|
|
||||||
return 0;
|
|
||||||
else {
|
|
||||||
b->flags |= BF_WRITE_ERROR;
|
|
||||||
fdtab[fd].state = FD_STERROR;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
b->flags |= BF_WRITE_ERROR;
|
|
||||||
fdtab[fd].state = FD_STERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* We don't want to re-arm read/write timeouts if we're trying to connect,
|
|
||||||
* otherwise it could loop indefinitely !
|
|
||||||
*/
|
|
||||||
if (s->srv_state != SV_STCONN) {
|
|
||||||
if (b->wto) {
|
|
||||||
tv_delayfrom(&b->wex, &now, b->wto);
|
|
||||||
/* FIXME: to prevent the client from expiring read timeouts during writes,
|
|
||||||
* we refresh it. A solution would be to merge read+write timeouts into a
|
|
||||||
* unique one, although that needs some study particularly on full-duplex
|
|
||||||
* TCP connections. */
|
|
||||||
b->rex = b->wex;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
tv_eternity(&b->wex);
|
|
||||||
}
|
|
||||||
|
|
||||||
task_wakeup(&rq, t);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Local variables:
|
* Local variables:
|
||||||
* c-indent-level: 8
|
* c-indent-level: 8
|
||||||
|
Loading…
Reference in New Issue
Block a user