MINOR: sink: merge sink_forward_io_handler() with sink_forward_oc_io_handler()

Now that sink_forward_oc_io_handler() and sink_forward_io_handler() were
unified again thanks to the previous commit, let's take a chance to merge
code that is common to both functions in order to ease code maintenance.

Let's add _sink_forward_io_handler() internal function which takes the
applet and a message handler as argument: sink_forward_io_handler() and
sink_forward_oc_io_handler() leverage this internal function by passing
the correct message handler for the desired format.
This commit is contained in:
Aurelien DARRAGON 2024-07-23 19:12:28 +02:00
parent f2848e6146
commit 10811fdfd6

View File

@ -345,11 +345,8 @@ void sink_setup_proxy(struct proxy *px)
sink_proxies_list = px;
}
/*
* IO Handler to handle message push to syslog tcp server.
* It takes its context from appctx->svcctx.
*/
static void sink_forward_io_handler(struct appctx *appctx)
static void _sink_forward_io_handler(struct appctx *appctx,
ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len))
{
struct stconn *sc = appctx_sc(appctx);
struct sink_forward_target *sft = appctx->svcctx;
@ -384,7 +381,7 @@ static void sink_forward_io_handler(struct appctx *appctx)
MT_LIST_DELETE(&appctx->wait_entry);
ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line);
ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, msg_handler);
if (ret) {
/* let's be woken up once new data arrive */
@ -410,68 +407,23 @@ close:
se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
}
/*
* IO Handler to handle message push to syslog tcp server.
* It takes its context from appctx->svcctx.
*/
static inline void sink_forward_io_handler(struct appctx *appctx)
{
_sink_forward_io_handler(appctx, applet_append_line);
}
/*
* IO Handler to handle message push to syslog tcp server
* using octet counting frames
* It takes its context from appctx->svcctx.
*/
static void sink_forward_oc_io_handler(struct appctx *appctx)
static inline void sink_forward_oc_io_handler(struct appctx *appctx)
{
struct stconn *sc = appctx_sc(appctx);
struct sink_forward_target *sft = appctx->svcctx;
struct sink *sink = sft->sink;
struct ring *ring = sink->ctx.ring;
size_t ofs, last_ofs;
int ret = 0;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR))))
goto out;
/* if stopping was requested, close immediately */
if (unlikely(stopping))
goto close;
/* if the connection is not established, inform the stream that we want
* to be notified whenever the connection completes.
*/
if (sc_opposite(sc)->state < SC_ST_EST) {
applet_need_more_data(appctx);
se_need_remote_conn(appctx->sedesc);
applet_have_more_data(appctx);
goto out;
}
HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
if (appctx != sft->appctx) {
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
goto close;
}
MT_LIST_DELETE(&appctx->wait_entry);
ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, syslog_applet_append_event);
if (ret) {
/* let's be woken up once new data arrive */
MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry);
ofs = ring_tail(ring);
if (ofs != last_ofs) {
/* more data was added into the ring between the
* unlock and the lock, and the writer might not
* have seen us. We need to reschedule a read.
*/
applet_have_more_data(appctx);
} else
applet_have_no_more_data(appctx);
}
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
out:
/* always drain data from server */
co_skip(sc_oc(sc), sc_oc(sc)->output);
return;
close:
se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
_sink_forward_io_handler(appctx, syslog_applet_append_event);
}
void __sink_forward_session_deinit(struct sink_forward_target *sft)