diff --git a/src/sink.c b/src/sink.c index b1e63e1d8..05ed71807 100644 --- a/src/sink.c +++ b/src/sink.c @@ -345,11 +345,8 @@ void sink_setup_proxy(struct proxy *px) sink_proxies_list = px; } -/* - * IO Handler to handle message push to syslog tcp server. - * It takes its context from appctx->svcctx. - */ -static void sink_forward_io_handler(struct appctx *appctx) +static void _sink_forward_io_handler(struct appctx *appctx, + ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len)) { struct stconn *sc = appctx_sc(appctx); struct sink_forward_target *sft = appctx->svcctx; @@ -384,7 +381,7 @@ static void sink_forward_io_handler(struct appctx *appctx) MT_LIST_DELETE(&appctx->wait_entry); - ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, applet_append_line); + ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, msg_handler); if (ret) { /* let's be woken up once new data arrive */ @@ -410,68 +407,23 @@ close: se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI); } +/* + * IO Handler to handle message push to syslog tcp server. + * It takes its context from appctx->svcctx. + */ +static inline void sink_forward_io_handler(struct appctx *appctx) +{ + _sink_forward_io_handler(appctx, applet_append_line); +} + /* * IO Handler to handle message push to syslog tcp server * using octet counting frames * It takes its context from appctx->svcctx. */ -static void sink_forward_oc_io_handler(struct appctx *appctx) +static inline void sink_forward_oc_io_handler(struct appctx *appctx) { - struct stconn *sc = appctx_sc(appctx); - struct sink_forward_target *sft = appctx->svcctx; - struct sink *sink = sft->sink; - struct ring *ring = sink->ctx.ring; - size_t ofs, last_ofs; - int ret = 0; - - if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) - goto out; - - /* if stopping was requested, close immediately */ - if (unlikely(stopping)) - goto close; - - /* if the connection is not established, inform the stream that we want - * to be notified whenever the connection completes. - */ - if (sc_opposite(sc)->state < SC_ST_EST) { - applet_need_more_data(appctx); - se_need_remote_conn(appctx->sedesc); - applet_have_more_data(appctx); - goto out; - } - - HA_SPIN_LOCK(SFT_LOCK, &sft->lock); - if (appctx != sft->appctx) { - HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); - goto close; - } - - MT_LIST_DELETE(&appctx->wait_entry); - - ret = ring_dispatch_messages(ring, appctx, &sft->ofs, &last_ofs, 0, syslog_applet_append_event); - if (ret) { - /* let's be woken up once new data arrive */ - MT_LIST_APPEND(&ring->waiters, &appctx->wait_entry); - ofs = ring_tail(ring); - if (ofs != last_ofs) { - /* more data was added into the ring between the - * unlock and the lock, and the writer might not - * have seen us. We need to reschedule a read. - */ - applet_have_more_data(appctx); - } else - applet_have_no_more_data(appctx); - } - HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock); - - out: - /* always drain data from server */ - co_skip(sc_oc(sc), sc_oc(sc)->output); - return; - -close: - se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI); + _sink_forward_io_handler(appctx, syslog_applet_append_event); } void __sink_forward_session_deinit(struct sink_forward_target *sft)