[MEDIUM] minor update to the task api: let the scheduler queue itself

All the tasks callbacks had to requeue the task themselves, and update
a global timeout. This was not convenient at all. Now the API has been
simplified. The tasks callbacks only have to update their expire timer,
and return either a pointer to the task or NULL if the task has been
deleted. The scheduler will take care of requeuing the task at the
proper place in the wait queue.
This commit is contained in:
Willy Tarreau 2009-03-08 09:38:41 +01:00
parent 4136522527
commit 26c250683f
11 changed files with 42 additions and 51 deletions

View File

@ -38,7 +38,7 @@ int match_str(const void *key1, const void *key2);
/* Callback for destroy */
void destroy(appsess *data);
void appsession_refresh(struct task *t, int *next);
struct task *appsession_refresh(struct task *t);
int appsession_task_init(void);
int appsession_init(void);
void appsession_cleanup(void);

View File

@ -2,7 +2,7 @@
include/proto/checks.h
Functions prototypes for the checks.
Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
Copyright (C) 2000-2009 Willy Tarreau - w@1wt.eu
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
@ -25,7 +25,7 @@
#include <types/task.h>
#include <common/config.h>
void process_chk(struct task *t, struct timeval *next);
struct task *process_chk(struct task *t);
int start_checks();
#endif /* _PROTO_CHECKS_H */

View File

@ -28,8 +28,7 @@
int uxst_event_accept(int fd);
void uxst_add_listener(struct listener *listener);
void process_uxst_stats(struct task *t, int *next);
void uxst_process_session(struct task *t, int *next);
struct task *uxst_process_session(struct task *t);
#endif /* _PROTO_PROTO_UXST_H */

View File

@ -36,7 +36,7 @@ int init_session();
void session_process_counters(struct session *s);
void sess_change_server(struct session *sess, struct server *newsrv);
void process_session(struct task *t, int *next);
struct task *process_session(struct task *t);
static void inline trace_term(struct session *s, unsigned int code)
{

View File

@ -85,7 +85,7 @@ struct listener {
struct listener *next; /* next address for the same proxy, or NULL */
struct list proto_list; /* list in the protocol header */
int (*accept)(int fd); /* accept() function passed to fdtab[] */
void (*handler)(struct task *t, int *next); /* protocol handler */
struct task * (*handler)(struct task *t); /* protocol handler. It is a task */
int *timeout; /* pointer to client-side timeout */
void *private; /* any private data which may be used by accept() */
unsigned int analysers; /* bitmap of required protocol analysers */

View File

@ -50,11 +50,18 @@ struct task {
struct eb32_node rq; /* ebtree node used to hold the task in the run queue */
int state; /* task state : bit field of TASK_* */
unsigned int expire; /* next expiration time for this task */
void (*process)(struct task *t, int *next); /* the function which processes the task */
struct task * (*process)(struct task *t); /* the function which processes the task */
void *context; /* the task's context */
int nice; /* the task's current nice value from -1024 to +1024 */
};
/*
* The task callback (->process) is responsible for updating ->expire. It must
* return a pointer to the task itself, except if the task has been deleted, in
* which case it returns NULL so that the scheduler knows it must not check the
* expire timer. The scheduler will requeue the task at the proper location.
*/
#endif /* _TYPES_TASK_H */
/*

View File

@ -2,7 +2,7 @@
* AppSession functions.
*
* Copyright 2004-2006 Alexander Lazic, Klaus Wagner
* Copyright 2006-2007 Willy Tarreau
* Copyright 2006-2009 Willy Tarreau
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -100,7 +100,7 @@ int appsession_task_init(void)
return 0;
}
void appsession_refresh(struct task *t, int *next)
struct task *appsession_refresh(struct task *t)
{
struct proxy *p = proxy;
struct appsession_hash *htbl;
@ -131,8 +131,7 @@ void appsession_refresh(struct task *t, int *next)
p = p->next;
}
t->expire = tick_add(now_ms, MS_TO_TICKS(TBLCHKINT)); /* check expiration every 5 seconds */
task_queue(t);
*next = t->expire;
return t;
} /* end appsession_refresh */
int match_str(const void *key1, const void *key2)

View File

@ -1,7 +1,7 @@
/*
* Health-checks functions.
*
* Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
* Copyright 2000-2009 Willy Tarreau <w@1wt.eu>
* Copyright 2007-2008 Krzysztof Piotr Oledzki <ole@ans.pl>
*
* This program is free software; you can redistribute it and/or
@ -522,9 +522,8 @@ static int event_srv_chk_r(int fd)
* manages a server health-check. Returns
* the time the task accepts to wait, or TIME_ETERNITY for infinity.
*/
void process_chk(struct task *t, int *next)
struct task *process_chk(struct task *t)
{
__label__ new_chk, out;
struct server *s = t->context;
struct sockaddr_in sa;
int fd;
@ -536,11 +535,8 @@ void process_chk(struct task *t, int *next)
fd = s->curfd;
if (fd < 0) { /* no check currently running */
//fprintf(stderr, "process_chk: 2\n");
if (!tick_is_expired(t->expire, now_ms)) { /* not good time yet */
task_queue(t); /* restore t to its place in the task list */
*next = t->expire;
goto out;
}
if (!tick_is_expired(t->expire, now_ms)) /* woke up too early */
return t;
/* we don't send any health-checks when the proxy is stopped or when
* the server should not be checked.
@ -548,9 +544,7 @@ void process_chk(struct task *t, int *next)
if (!(s->state & SRV_CHECKED) || s->proxy->state == PR_STSTOPPED) {
while (tick_is_expired(t->expire, now_ms))
t->expire = tick_add(t->expire, MS_TO_TICKS(s->inter));
task_queue(t); /* restore t to its place in the task list */
*next = t->expire;
goto out;
return t;
}
/* we'll initiate a new check */
@ -674,10 +668,7 @@ void process_chk(struct task *t, int *next)
int t_con = tick_add(now_ms, s->proxy->timeout.connect);
t->expire = tick_first(t->expire, t_con);
}
task_queue(t); /* restore t to its place in the task list */
*next = t->expire;
return;
return t;
}
else if (errno != EALREADY && errno != EISCONN && errno != EAGAIN) {
s->result |= SRV_CHK_ERROR; /* a real error */
@ -797,10 +788,7 @@ void process_chk(struct task *t, int *next)
}
//fprintf(stderr, "process_chk: 11\n");
s->result = SRV_CHK_UNKNOWN;
task_queue(t); /* restore t to its place in the task list */
*next = t->expire;
out:
return;
return t;
}
/*

View File

@ -708,7 +708,7 @@ int uxst_req_analyser_stats(struct session *s, struct buffer *req)
* still exists but remains in SI_ST_INI state forever, so that any call is a
* NOP.
*/
void uxst_process_session(struct task *t, int *next)
struct task *uxst_process_session(struct task *t)
{
struct session *s = t->context;
int resync;
@ -969,11 +969,7 @@ void uxst_process_session(struct task *t, int *next)
if (s->si[0].exp)
t->expire = tick_first(t->expire, s->si[0].exp);
/* restore t to its place in the task list */
task_queue(t);
*next = t->expire;
return; /* nothing more to do */
return t;
}
actconn--;
@ -988,10 +984,10 @@ void uxst_process_session(struct task *t, int *next)
}
/* the task MUST not be in the run queue anymore */
task_delete(t);
session_free(s);
task_delete(t);
task_free(t);
*next = TICK_ETERNITY;
return NULL;
}
__attribute__((constructor))

View File

@ -554,7 +554,7 @@ static void sess_prepare_conn_req(struct session *s, struct stream_interface *si
* and each function is called only if at least another function has changed at
* least one flag it is interested in.
*/
void process_session(struct task *t, int *next)
struct task *process_session(struct task *t)
{
struct session *s = t->context;
int resync;
@ -1029,16 +1029,13 @@ resync_stream_interface:
fprintf(stderr, "[%u] queuing with exp=%u req->rex=%u req->wex=%u req->ana_exp=%u rep->rex=%u rep->wex=%u, cs=%d, ss=%d\n",
now_ms, t->expire, s->req->rex, s->req->wex, s->req->analyse_exp, s->rep->rex, s->rep->wex, s->si[0].state, s->si[1].state);
#endif
/* restore t to its place in the task list */
task_queue(t);
#ifdef DEBUG_DEV
/* this may only happen when no timeout is set or in case of an FSM bug */
if (!t->expire)
ABORT_NOW();
#endif
*next = t->expire;
return; /* nothing more to do */
return t; /* nothing more to do */
}
s->fe->feconn--;
@ -1066,10 +1063,10 @@ resync_stream_interface:
}
/* the task MUST not be in the run queue anymore */
task_delete(t);
session_free(s);
task_delete(t);
task_free(t);
*next = TICK_ETERNITY;
return NULL;
}
/*

View File

@ -295,11 +295,11 @@ void wake_expired_tasks(int *next)
*/
void process_runnable_tasks(int *next)
{
int temp;
struct task *t;
struct eb32_node *eb;
unsigned int tree, stop;
unsigned int max_processed;
int expire;
if (!run_queue)
return;
@ -315,6 +315,7 @@ void process_runnable_tasks(int *next)
stop = (tree + TIMER_TREES / 2) & TIMER_TREE_MASK;
tree = (tree - 1) & TIMER_TREE_MASK;
expire = *next;
do {
eb = eb32_first(&rqueue[tree]);
while (eb) {
@ -325,15 +326,19 @@ void process_runnable_tasks(int *next)
__task_unlink_rq(t);
t->state |= TASK_RUNNING;
t->process(t, &temp);
if (likely(t->process(t) != NULL)) {
t->state &= ~TASK_RUNNING;
*next = tick_first(*next, temp);
expire = tick_first(expire, t->expire);
task_queue(t);
}
if (!--max_processed)
return;
goto out;
}
tree = (tree + 1) & TIMER_TREE_MASK;
} while (tree != stop);
out:
*next = expire;
}
/* perform minimal intializations, report 0 in case of error, 1 if OK. */