MEDIUM: sink: add sink_finalize() function

To further clean the code and remove duplication, some sink postparsing
and sink->sft finalization is now performed in a dedicated function
named sink_finalize().
This commit is contained in:
Aurelien DARRAGON 2023-07-10 15:04:40 +02:00 committed by Christopher Faulet
parent b2879e3502
commit 8e6339aa29

View File

@ -848,6 +848,67 @@ static struct sink *sink_new_ringbuf(const char *id, const char *description,
return NULL;
}
/* Finalize sink struct to ensure configuration consistency and
* allocate final struct members
*
* Returns ERR_NONE on success, ERR_WARN on warning
* Returns a composition of ERR_ALERT, ERR_ABORT, ERR_FATAL on failure
*/
static int sink_finalize(struct sink *sink)
{
int err_code = ERR_NONE;
struct server *srv;
if (sink && (sink->type == SINK_TYPE_BUFFER)) {
if (!sink->maxlen)
sink->maxlen = ~0; // maxlen not set: no implicit truncation
else if (sink->maxlen > ring_max_payload(sink->ctx.ring)) {
/* maxlen set by user however it doesn't fit: set to max value */
ha_warning("ring '%s' event max length '%u' exceeds max payload size, forced to '%lu'.\n",
sink->name, sink->maxlen, (unsigned long)ring_max_payload(sink->ctx.ring));
sink->maxlen = ring_max_payload(sink->ctx.ring);
err_code |= ERR_WARN;
}
/* prepare forward server descriptors */
if (sink->forward_px) {
srv = sink->forward_px->srv;
while (srv) {
struct sink_forward_target *sft;
/* allocate sink_forward_target descriptor */
sft = calloc(1, sizeof(*sft));
if (!sft) {
ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n", srv->id, sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
break;
}
sft->srv = srv;
sft->appctx = NULL;
sft->ofs = ~0; /* init ring offset */
sft->sink = sink;
sft->next = sink->sft;
HA_SPIN_INIT(&sft->lock);
/* mark server attached to the ring */
if (!ring_attach(sink->ctx.ring)) {
ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
ha_free(&sft);
break;
}
sink->sft = sft;
srv = srv->next;
}
if (sink_init_forward(sink) == 0) {
ha_alert("error when trying to initialize sink buffer forwarding.\n");
err_code |= ERR_ALERT | ERR_FATAL;
}
}
}
return err_code;
}
/*
* Parse "ring" section and create corresponding sink buffer.
*
@ -1119,7 +1180,6 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
{
struct sink *sink = NULL;
struct server *srv = NULL;
struct sink_forward_target *sft = NULL;
char *err_msg = NULL;
/* prepare description for the sink */
@ -1135,7 +1195,7 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
}
/* disable sink->maxlen, we already have logsrv->maxlen */
sink->maxlen = ~0;
sink->maxlen = 0;
/* set ring format from logsrv format */
sink->fmt = logsrv->format;
@ -1163,40 +1223,13 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
if (srv_init_per_thr(srv) == -1)
goto error;
/* allocate sink_forward_target descriptor */
sft = calloc(1, sizeof(*sft));
if (!sft)
goto error;
/* init sink_forward_target offset */
sft->srv = srv;
sft->appctx = NULL;
sft->ofs = ~0;
HA_SPIN_INIT(&sft->lock);
/* link srv with sink forward proxy: the servers are linked
* backwards first into proxy
*/
srv->next = sink->forward_px->srv;
sink->forward_px->srv = srv;
/* insert into sink_forward_targets
* list into sink
*/
sft->sink = sink;
sft->next = sink->sft;
sink->sft = sft;
/* mark server as an attached reader to the ring */
if (!ring_attach(sink->ctx.ring)) {
/* should never fail since there is
* only one reader
*/
goto error_final;
}
/* initialize sink buffer forwarding */
if (!sink_init_forward(sink))
if (sink_finalize(sink) & ERR_CODE)
goto error_final;
/* reset familyt of logsrv to consider the ring buffer target */
@ -1206,9 +1239,6 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
error:
srv_drop(srv);
if (sft)
free(sft);
error_final:
sink_free(sink);
@ -1223,56 +1253,9 @@ struct sink *sink_new_from_logsrv(struct logsrv *logsrv)
*/
int cfg_post_parse_ring()
{
int err_code = 0;
struct server *srv;
int err_code;
if (cfg_sink && (cfg_sink->type == SINK_TYPE_BUFFER)) {
if (!cfg_sink->maxlen)
cfg_sink->maxlen = ~0; // maxlen not set: no implicit truncation
else if (cfg_sink->maxlen > ring_max_payload(cfg_sink->ctx.ring)) {
/* maxlen set by user however it doesn't fit: set to max value */
ha_warning("ring '%s' event max length '%u' exceeds max payload size, forced to '%lu'.\n",
cfg_sink->name, cfg_sink->maxlen, (unsigned long)ring_max_payload(cfg_sink->ctx.ring));
cfg_sink->maxlen = ring_max_payload(cfg_sink->ctx.ring);
err_code |= ERR_WARN;
}
/* prepare forward server descriptors */
if (cfg_sink->forward_px) {
srv = cfg_sink->forward_px->srv;
while (srv) {
struct sink_forward_target *sft;
/* allocate sink_forward_target descriptor */
sft = calloc(1, sizeof(*sft));
if (!sft) {
ha_alert("memory allocation error initializing server '%s' in ring '%s'.\n",srv->id, cfg_sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
break;
}
sft->srv = srv;
sft->appctx = NULL;
sft->ofs = ~0; /* init ring offset */
sft->sink = cfg_sink;
sft->next = cfg_sink->sft;
HA_SPIN_INIT(&sft->lock);
/* mark server attached to the ring */
if (!ring_attach(cfg_sink->ctx.ring)) {
ha_alert("server '%s' sets too many watchers > 255 on ring '%s'.\n", srv->id, cfg_sink->name);
err_code |= ERR_ALERT | ERR_FATAL;
ha_free(&sft);
break;
}
cfg_sink->sft = sft;
srv = srv->next;
}
if (sink_init_forward(cfg_sink) == 0) {
ha_alert("error when trying to initialize sink buffer forwarding.\n");
err_code |= ERR_ALERT | ERR_FATAL;
}
}
}
err_code = sink_finalize(cfg_sink);
cfg_sink = NULL;
return err_code;