diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index f408f96b6..6ee6ebfc6 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -2,7 +2,7 @@ include/proto/stream_interface.h This file contains stream_interface function prototypes - Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu + Copyright (C) 2000-2009 Willy Tarreau - w@1wt.eu This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public @@ -34,6 +34,20 @@ void stream_int_report_error(struct stream_interface *si); void stream_int_return(struct stream_interface *si, const struct chunk *msg); void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg); +/* functions used when running a stream interface as a task */ +void stream_int_update(struct stream_interface *si); +void stream_int_update_embedded(struct stream_interface *si); +void stream_int_shutr(struct stream_interface *si); +void stream_int_shutw(struct stream_interface *si); +void stream_int_chk_rcv(struct stream_interface *si); +void stream_int_chk_snd(struct stream_interface *si); + +struct task *stream_int_register_handler(struct stream_interface *si, + void (*fct)(struct stream_interface *)); +struct task *stream_int_register_handler_task(struct stream_interface *si, + struct task *(*fct)(struct task *)); +void stream_int_unregister_handler(struct stream_interface *si); + #endif /* _PROTO_STREAM_INTERFACE_H */ /* diff --git a/include/types/stream_interface.h b/include/types/stream_interface.h index 70b8cd596..9dfafcb6b 100644 --- a/include/types/stream_interface.h +++ b/include/types/stream_interface.h @@ -76,6 +76,10 @@ enum { struct server; struct proxy; +/* Note that if an iohandler is set, the update function will not be called by + * the session handler, so it may be used to resync flags at the end of the I/O + * handler. See stream_int_update_embedded() for reference. + */ struct stream_interface { unsigned int state; /* SI_ST* */ unsigned int prev_state;/* SI_ST*, copy of previous state */ diff --git a/src/stream_interface.c b/src/stream_interface.c index 76f447d32..b42541943 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -1,7 +1,7 @@ /* * Functions managing stream_interface structures * - * Copyright 2000-2008 Willy Tarreau + * Copyright 2000-2009 Willy Tarreau * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License @@ -93,6 +93,255 @@ void stream_int_retnclose(struct stream_interface *si, const struct chunk *msg) buffer_auto_close(si->ob); } +/* default update function for scheduled tasks, not used for embedded tasks */ +void stream_int_update(struct stream_interface *si) +{ + DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n", + __FUNCTION__, + si, si->state, si->ib->flags, si->ob->flags); + + if (!(si->flags & SI_FL_DONT_WAKE) && si->owner) + task_wakeup(si->owner, TASK_WOKEN_IO); +} + +/* default update function for embedded tasks, to be used at the end of the i/o handler */ +void stream_int_update_embedded(struct stream_interface *si) +{ + DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n", + __FUNCTION__, + si, si->state, si->ib->flags, si->ob->flags); + + if (si->state != SI_ST_EST) + return; + + if ((si->ob->flags & (BF_OUT_EMPTY|BF_SHUTW|BF_HIJACK|BF_SHUTW_NOW)) == (BF_OUT_EMPTY|BF_SHUTW_NOW)) + si->shutw(si); + + if ((si->ob->flags & (BF_FULL|BF_SHUTW|BF_SHUTW_NOW|BF_HIJACK)) == 0) + si->flags |= SI_FL_WAIT_DATA; + + if ((si->ib->flags & (BF_FULL|BF_SHUTR)) == BF_FULL) + si->flags |= SI_FL_WAIT_ROOM; + + if (si->ob->flags & BF_WRITE_ACTIVITY || si->ib->flags & BF_READ_ACTIVITY) { + if (tick_isset(si->ib->rex)) + si->ib->rex = tick_add_ifset(now_ms, si->ib->rto); + if (tick_isset(si->ob->wex)) + si->ob->wex = tick_add_ifset(now_ms, si->ob->wto); + } + + if (si->ob->flags & BF_WRITE_PARTIAL) + si->ob->prod->chk_rcv(si->ob->prod); + + if (si->ib->flags & BF_READ_PARTIAL) + si->ib->cons->chk_snd(si->ib->cons); + + /* Note that we're trying to wake up in two conditions here : + * - special event, which needs the holder task attention + * - status indicating that the applet can go on working. This + * is rather hard because we might be blocking on output and + * don't want to wake up on input and vice-versa. The idea is + * the to only rely the changes the chk_* might have performed. + */ + if (/* check stream interface changes */ + (si->flags & SI_FL_ERR) || si->state != SI_ST_EST || si->ib->cons->state != SI_ST_EST || + /* check response buffer changes */ + (si->ib->flags & (BF_READ_NULL|BF_READ_ERROR|BF_READ_DONTWAIT)) || + ((si->ib->flags & BF_READ_ACTIVITY) && !si->ib->to_forward) || + (!(si->ib->flags & BF_FULL) && (si->ib->flags & BF_WRITE_ACTIVITY) && si->ib->to_forward) || + /* check request buffer changes */ + (si->ob->flags & (BF_WRITE_ERROR)) || + ((si->ob->flags & BF_WRITE_ACTIVITY) && (si->ob->flags & BF_OUT_EMPTY) && !si->ob->to_forward) || + (si->ob->flags & BF_READ_ACTIVITY)) { + if (!(si->flags & SI_FL_DONT_WAKE) && si->owner) + task_wakeup(si->owner, TASK_WOKEN_IO); + } +} + +/* default shutr function for scheduled tasks */ +void stream_int_shutr(struct stream_interface *si) +{ + DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n", + __FUNCTION__, + si, si->state, si->ib->flags, si->ob->flags); + + si->ib->flags &= ~BF_SHUTR_NOW; + if (si->ib->flags & BF_SHUTR) + return; + si->ib->flags |= BF_SHUTR; + si->ib->rex = TICK_ETERNITY; + si->flags &= ~SI_FL_WAIT_ROOM; + + if (si->state != SI_ST_EST && si->state != SI_ST_CON) + return; + + if (si->ob->flags & BF_SHUTW) { + si->state = SI_ST_DIS; + si->exp = TICK_ETERNITY; + } + + /* note that if the task exist, it must unregister itself once it runs */ + if (!(si->flags & SI_FL_DONT_WAKE) && si->owner) + task_wakeup(si->owner, TASK_WOKEN_IO); +} + +/* default shutw function for scheduled tasks */ +void stream_int_shutw(struct stream_interface *si) +{ + DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n", + __FUNCTION__, + si, si->state, si->ib->flags, si->ob->flags); + + si->ob->flags &= ~BF_SHUTW_NOW; + if (si->ob->flags & BF_SHUTW) + return; + si->ob->flags |= BF_SHUTW; + si->ob->wex = TICK_ETERNITY; + si->flags &= ~SI_FL_WAIT_DATA; + + switch (si->state) { + case SI_ST_EST: + if (!(si->ib->flags & BF_SHUTR)) + break; + + /* fall through */ + case SI_ST_CON: + case SI_ST_CER: + si->state = SI_ST_DIS; + /* fall through */ + default: + si->flags &= ~SI_FL_WAIT_ROOM; + si->ib->flags |= BF_SHUTR; + si->ib->rex = TICK_ETERNITY; + si->exp = TICK_ETERNITY; + } + + /* note that if the task exist, it must unregister itself once it runs */ + if (!(si->flags & SI_FL_DONT_WAKE) && si->owner) + task_wakeup(si->owner, TASK_WOKEN_IO); +} + +/* default chk_rcv function for scheduled tasks */ +void stream_int_chk_rcv(struct stream_interface *si) +{ + struct buffer *ib = si->ib; + + DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n", + __FUNCTION__, + si, si->state, si->ib->flags, si->ob->flags); + + if (unlikely(si->state != SI_ST_EST || (ib->flags & BF_SHUTR))) + return; + + if (ib->flags & (BF_FULL|BF_HIJACK)) { + /* stop reading */ + if ((ib->flags & (BF_FULL|BF_HIJACK)) == BF_FULL) + si->flags |= SI_FL_WAIT_ROOM; + } + else { + /* (re)start reading */ + si->flags &= ~SI_FL_WAIT_ROOM; + if (!(si->flags & SI_FL_DONT_WAKE) && si->owner) + task_wakeup(si->owner, TASK_WOKEN_IO); + } +} + +/* default chk_snd function for scheduled tasks */ +void stream_int_chk_snd(struct stream_interface *si) +{ + struct buffer *ob = si->ob; + + DPRINTF(stderr, "%s: si=%p, si->state=%d ib->flags=%08x ob->flags=%08x\n", + __FUNCTION__, + si, si->state, si->ib->flags, si->ob->flags); + + if (unlikely(si->state != SI_ST_EST || (si->ob->flags & BF_SHUTW))) + return; + + if (!(si->flags & SI_FL_WAIT_DATA) || /* not waiting for data */ + (ob->flags & BF_OUT_EMPTY)) /* called with nothing to send ! */ + return; + + /* Otherwise there are remaining data to be sent in the buffer, + * so we tell the handler. + */ + si->flags &= ~SI_FL_WAIT_DATA; + if (!tick_isset(ob->wex)) + ob->wex = tick_add_ifset(now_ms, ob->wto); + + if (!(si->flags & SI_FL_DONT_WAKE) && si->owner) + task_wakeup(si->owner, TASK_WOKEN_IO); +} + +/* Register a function to handle a stream_interface as part of the stream + * interface's owner task, which is returned. The SI will wake it up everytime + * it is solicited. The task's processing function must call the specified + * function before returning. It must be deleted by the task handler using + * stream_int_unregister_handler(), possibly from withing the function itself. + */ +struct task *stream_int_register_handler(struct stream_interface *si, + void (*fct)(struct stream_interface *)) +{ + DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner); + + si->update = stream_int_update_embedded; + si->shutr = stream_int_shutr; + si->shutw = stream_int_shutw; + si->chk_rcv = stream_int_chk_rcv; + si->chk_snd = stream_int_chk_snd; + si->connect = NULL; + si->iohandler = fct; + si->flags |= SI_FL_WAIT_DATA; + return si->owner; +} + +/* Register a function to handle a stream_interface as a standalone task. The + * new task itself is returned and is assigned as si->owner. The stream_interface + * pointer will be pointed to by the task's context. The handler can be detached + * by using stream_int_unregister_handler(). + */ +struct task *stream_int_register_handler_task(struct stream_interface *si, + struct task *(*fct)(struct task *)) +{ + struct task *t; + + DPRINTF(stderr, "registering handler %p for si %p (was %p)\n", fct, si, si->owner); + + si->update = stream_int_update; + si->shutr = stream_int_shutr; + si->shutw = stream_int_shutw; + si->chk_rcv = stream_int_chk_rcv; + si->chk_snd = stream_int_chk_snd; + si->connect = NULL; + si->iohandler = NULL; /* not used when running as an external task */ + si->flags |= SI_FL_WAIT_DATA; + + t = task_new(); + si->owner = t; + if (!t) + return t; + t->process = fct; + t->context = si; + task_wakeup(si->owner, TASK_WOKEN_INIT); + + return t; +} + +/* Unregister a stream interface handler. This must be called by the handler task + * itself when it detects that it is in the SI_ST_DIS state. This function can + * both detach standalone handlers and embedded handlers. + */ +void stream_int_unregister_handler(struct stream_interface *si) +{ + if (!si->iohandler && si->owner) { + /* external handler : kill the task */ + task_delete(si->owner); + task_free(si->owner); + } + si->iohandler = NULL; + si->owner = NULL; +} + /* * Local variables: * c-indent-level: 8