MEDIUM: sink/ring: introduce high level ring creation helper function

ease code maintenance.
This commit is contained in:
Aurelien DARRAGON 2023-07-06 16:43:40 +02:00 committed by Christopher Faulet
parent 5a8755681d
commit b2879e3502

View File

@ -805,6 +805,49 @@ static void sink_free(struct sink *sink)
ha_free(&sink); ha_free(&sink);
} }
/* Helper function to create new high-level ring buffer (as in ring section from
* the config)
*
* Returns NULL on failure
*/
static struct sink *sink_new_ringbuf(const char *id, const char *description,
const char *file, int linenum, char **err_msg)
{
struct sink *sink;
struct proxy *p = NULL; // forward_px
/* allocate new proxy to handle forwards */
p = calloc(1, sizeof(*p));
if (!p) {
memprintf(err_msg, "out of memory");
goto err;
}
init_new_proxy(p);
sink_setup_proxy(p);
p->id = strdup(id);
p->conf.args.file = p->conf.file = strdup(file);
p->conf.args.line = p->conf.line = linenum;
sink = sink_new_buf(id, description, LOG_FORMAT_RAW, BUFSIZE);
if (!sink || sink->type != SINK_TYPE_BUFFER) {
memprintf(err_msg, "unable to create a new sink buffer for ring '%s'", id);
goto err;
}
sink->forward_px = p;
/* link sink forward_target to proxy */
p->parent = sink;
sink->forward_px = p;
return sink;
err:
free_proxy(p);
return NULL;
}
/* /*
* Parse "ring" section and create corresponding sink buffer. * Parse "ring" section and create corresponding sink buffer.
* *
@ -814,9 +857,8 @@ static void sink_free(struct sink *sink)
int cfg_parse_ring(const char *file, int linenum, char **args, int kwm) int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
{ {
int err_code = 0; int err_code = 0;
char *err_msg = NULL;
const char *inv; const char *inv;
size_t size = BUFSIZE;
struct proxy *p;
if (strcmp(args[0], "ring") == 0) { /* new ring section */ if (strcmp(args[0], "ring") == 0) { /* new ring section */
if (!*args[1]) { if (!*args[1]) {
@ -838,34 +880,22 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
goto err; goto err;
} }
cfg_sink = sink_new_buf(args[1], args[1], LOG_FORMAT_RAW, size); cfg_sink = sink_new_ringbuf(args[1], args[1], file, linenum, &err_msg);
if (!cfg_sink || cfg_sink->type != SINK_TYPE_BUFFER) { if (!cfg_sink) {
ha_alert("parsing [%s:%d] : unable to create a new sink buffer for ring '%s'.\n", file, linenum, args[1]); ha_alert("parsing [%s:%d] : %s.\n", file, linenum, err_msg);
ha_free(&err_msg);
err_code |= ERR_ALERT | ERR_FATAL; err_code |= ERR_ALERT | ERR_FATAL;
goto err; goto err;
} }
/* set maxlen value to 0 for now, we rely on this in postparsing /* set maxlen value to 0 for now, we rely on this in postparsing
* to know if it was explicitly set using the "maxlen" parameter * to know if it was explicitly set using the "maxlen" parameter
*/ */
cfg_sink->maxlen = 0; cfg_sink->maxlen = 0;
/* allocate new proxy to handle forwards */
p = calloc(1, sizeof *p);
if (!p) {
ha_alert("parsing [%s:%d] : out of memory.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL;
goto err;
}
init_new_proxy(p);
sink_setup_proxy(p);
p->parent = cfg_sink;
p->id = strdup(args[1]);
p->conf.args.file = p->conf.file = strdup(file);
p->conf.args.line = p->conf.line = linenum;
cfg_sink->forward_px = p;
} }
else if (strcmp(args[0], "size") == 0) { else if (strcmp(args[0], "size") == 0) {
size_t size;
if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) { if (!cfg_sink || (cfg_sink->type != SINK_TYPE_BUFFER)) {
ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum); ha_alert("parsing [%s:%d] : 'size' directive not usable with this type of sink.\n", file, linenum);
err_code |= ERR_ALERT | ERR_FATAL; err_code |= ERR_ALERT | ERR_FATAL;
@ -1087,33 +1117,37 @@ int cfg_parse_ring(const char *file, int linenum, char **args, int kwm)
*/ */
struct sink *sink_new_from_logsrv(struct logsrv *logsrv) struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
{ {
struct proxy *p = NULL;
struct sink *sink = NULL; struct sink *sink = NULL;
struct server *srv = NULL; struct server *srv = NULL;
struct sink_forward_target *sft = NULL; struct sink_forward_target *sft = NULL;
char *err_msg = NULL;
/* allocate new proxy to handle /* prepare description for the sink */
* forward to a stream server chunk_reset(&trash);
*/ chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line);
p = calloc(1, sizeof *p);
if (!p) { /* allocate a new sink buffer */
sink = sink_new_ringbuf(logsrv->ring_name, trash.area, logsrv->conf.file, logsrv->conf.line, &err_msg);
if (!sink) {
ha_alert("%s.\n", err_msg);
ha_free(&err_msg);
goto error; goto error;
} }
init_new_proxy(p); /* disable sink->maxlen, we already have logsrv->maxlen */
sink_setup_proxy(p); sink->maxlen = ~0;
p->id = strdup(logsrv->ring_name);
p->conf.args.file = p->conf.file = strdup(logsrv->conf.file);
p->conf.args.line = p->conf.line = logsrv->conf.line;
/* Set default connect and server timeout */ /* set ring format from logsrv format */
p->timeout.connect = MS_TO_TICKS(1000); sink->fmt = logsrv->format;
p->timeout.server = MS_TO_TICKS(5000);
/* Set default connect and server timeout for sink forward proxy */
sink->forward_px->timeout.connect = MS_TO_TICKS(1000);
sink->forward_px->timeout.server = MS_TO_TICKS(5000);
/* allocate a new server to forward messages /* allocate a new server to forward messages
* from ring buffer * from ring buffer
*/ */
srv = new_server(p); srv = new_server(sink->forward_px);
if (!srv) if (!srv)
goto error; goto error;
@ -1129,12 +1163,6 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
if (srv_init_per_thr(srv) == -1) if (srv_init_per_thr(srv) == -1)
goto error; goto error;
/* the servers are linked backwards
* first into proxy
*/
srv->next = p->srv;
p->srv = srv;
/* allocate sink_forward_target descriptor */ /* allocate sink_forward_target descriptor */
sft = calloc(1, sizeof(*sft)); sft = calloc(1, sizeof(*sft));
if (!sft) if (!sft)
@ -1146,19 +1174,11 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
sft->ofs = ~0; sft->ofs = ~0;
HA_SPIN_INIT(&sft->lock); HA_SPIN_INIT(&sft->lock);
/* prepare description for the sink */ /* link srv with sink forward proxy: the servers are linked
chunk_reset(&trash); * backwards first into proxy
chunk_printf(&trash, "created from logserver declared into '%s' at line %d", logsrv->conf.file, logsrv->conf.line); */
srv->next = sink->forward_px->srv;
/* allocate a new sink buffer */ sink->forward_px->srv = srv;
sink = sink_new_buf(logsrv->ring_name, trash.area, logsrv->format, BUFSIZE);
if (!sink || sink->type != SINK_TYPE_BUFFER) {
goto error;
}
/* link sink_forward_target to proxy */
sink->forward_px = p;
p->parent = sink;
/* insert into sink_forward_targets /* insert into sink_forward_targets
* list into sink * list into sink
@ -1172,41 +1192,25 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
/* should never fail since there is /* should never fail since there is
* only one reader * only one reader
*/ */
goto error; goto error_final;
} }
/* initialize sink buffer forwarding */ /* initialize sink buffer forwarding */
if (!sink_init_forward(sink)) if (!sink_init_forward(sink))
goto error; goto error_final;
/* reset familyt of logsrv to consider the ring buffer target */ /* reset familyt of logsrv to consider the ring buffer target */
logsrv->addr.ss_family = AF_UNSPEC; logsrv->addr.ss_family = AF_UNSPEC;
return sink; return sink;
error: error:
if (srv) srv_drop(srv);
srv_drop(srv);
if (p) {
if (p->id)
free(p->id);
if (p->conf.file)
free(p->conf.file);
free(p);
}
if (sft) if (sft)
free(sft); free(sft);
if (sink) { error_final:
ring_free(sink->ctx.ring); sink_free(sink);
LIST_DELETE(&sink->sink_list);
free(sink->name);
free(sink->desc);
free(sink);
}
return NULL; return NULL;
} }