mirror of
https://git.haproxy.org/git/haproxy.git/
synced 2025-08-07 07:37:02 +02:00
MEDIUM: ring: add new srv statement to support octet counting forward
log-proto <logproto> The "log-proto" specifies the protocol used to forward event messages to a server configured in a ring section. Possible values are "legacy" and "octet-count" corresponding respectively to "Non-transparent-framing" and "Octet counting" in rfc6587. "legacy" is the default. Notes: a separated io_handler was created to avoid per messages test and to prepare code to set different log protocols such as request- response based ones.
This commit is contained in:
parent
494c505703
commit
975564784f
@ -2606,7 +2606,8 @@ server <name> <address> [param*]
|
|||||||
respond, it will prevent old messages from being purged and may block new
|
respond, it will prevent old messages from being purged and may block new
|
||||||
messages from being inserted into the ring. The proper way to send messages
|
messages from being inserted into the ring. The proper way to send messages
|
||||||
to multiple servers is to use one distinct ring per log server, not to
|
to multiple servers is to use one distinct ring per log server, not to
|
||||||
attach multiple servers to the same ring.
|
attach multiple servers to the same ring. Note that specific server directive
|
||||||
|
"log-proto" is used to set the protocol used to send messages.
|
||||||
|
|
||||||
size <size>
|
size <size>
|
||||||
This is the optional size in bytes for the ring-buffer. Default value is
|
This is the optional size in bytes for the ring-buffer. Default value is
|
||||||
@ -2639,7 +2640,7 @@ timeout server <timeout>
|
|||||||
size 32764
|
size 32764
|
||||||
timeout connect 5s
|
timeout connect 5s
|
||||||
timeout server 10s
|
timeout server 10s
|
||||||
server mysyslogsrv 127.0.0.1:6514
|
server mysyslogsrv 127.0.0.1:6514 log-proto octet-count
|
||||||
|
|
||||||
|
|
||||||
4. Proxies
|
4. Proxies
|
||||||
@ -13080,6 +13081,12 @@ downinter <delay>
|
|||||||
global "spread-checks" keyword. This makes sense for instance when a lot
|
global "spread-checks" keyword. This makes sense for instance when a lot
|
||||||
of backends use the same servers.
|
of backends use the same servers.
|
||||||
|
|
||||||
|
log-proto <logproto>
|
||||||
|
The "log-proto" specifies the protocol used to forward event messages to
|
||||||
|
a server configured in a ring section. Possible values are "legacy"
|
||||||
|
and "octet-count" corresponding respectively to "Non-transparent-framing"
|
||||||
|
and "Octet counting" in rfc6587. "legacy" is the default.
|
||||||
|
|
||||||
maxconn <maxconn>
|
maxconn <maxconn>
|
||||||
The "maxconn" parameter specifies the maximal number of concurrent
|
The "maxconn" parameter specifies the maximal number of concurrent
|
||||||
connections that will be sent to this server. If the number of incoming
|
connections that will be sent to this server. If the number of incoming
|
||||||
|
@ -177,6 +177,12 @@ enum srv_initaddr {
|
|||||||
#define SRV_SSL_O_EARLY_DATA 0x400 /* Allow using early data */
|
#define SRV_SSL_O_EARLY_DATA 0x400 /* Allow using early data */
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
/* log servers ring's protocols options */
|
||||||
|
enum srv_log_proto {
|
||||||
|
SRV_LOG_PROTO_LEGACY, // messages on TCP separated by LF
|
||||||
|
SRV_LOG_PROTO_OCTET_COUNTING, // TCP frames: MSGLEN SP MSG
|
||||||
|
};
|
||||||
|
|
||||||
/* The server names dictionary */
|
/* The server names dictionary */
|
||||||
extern struct dict server_name_dict;
|
extern struct dict server_name_dict;
|
||||||
|
|
||||||
@ -291,6 +297,7 @@ struct server {
|
|||||||
char *hostname; /* server hostname */
|
char *hostname; /* server hostname */
|
||||||
struct sockaddr_storage init_addr; /* plain IP address specified on the init-addr line */
|
struct sockaddr_storage init_addr; /* plain IP address specified on the init-addr line */
|
||||||
unsigned int init_addr_methods; /* initial address setting, 3-bit per method, ends at 0, enough to store 10 entries */
|
unsigned int init_addr_methods; /* initial address setting, 3-bit per method, ends at 0, enough to store 10 entries */
|
||||||
|
enum srv_log_proto log_proto; /* used proto to emmit messages on server lines from ring section */
|
||||||
|
|
||||||
#ifdef USE_OPENSSL
|
#ifdef USE_OPENSSL
|
||||||
char *sni_expr; /* Temporary variable to store a sample expression for SNI */
|
char *sni_expr; /* Temporary variable to store a sample expression for SNI */
|
||||||
|
14
src/server.c
14
src/server.c
@ -2275,6 +2275,20 @@ int parse_server(const char *file, int linenum, char **args, struct proxy *curpr
|
|||||||
newsrv->uweight = newsrv->iweight = w;
|
newsrv->uweight = newsrv->iweight = w;
|
||||||
cur_arg += 2;
|
cur_arg += 2;
|
||||||
}
|
}
|
||||||
|
else if (!strcmp(args[cur_arg], "log-proto")) {
|
||||||
|
if (!strcmp(args[cur_arg + 1], "legacy"))
|
||||||
|
newsrv->log_proto = SRV_LOG_PROTO_LEGACY;
|
||||||
|
else if (!strcmp(args[cur_arg + 1], "octet-count"))
|
||||||
|
newsrv->log_proto = SRV_LOG_PROTO_OCTET_COUNTING;
|
||||||
|
else {
|
||||||
|
ha_alert("parsing [%s:%d]: '%s' expects one of 'legacy' or "
|
||||||
|
"'octet-count' but got '%s'\n",
|
||||||
|
file, linenum, args[cur_arg], args[cur_arg + 1]);
|
||||||
|
err_code |= ERR_ALERT | ERR_FATAL;
|
||||||
|
goto out;
|
||||||
|
}
|
||||||
|
cur_arg += 2;
|
||||||
|
}
|
||||||
else if (!strcmp(args[cur_arg], "minconn")) {
|
else if (!strcmp(args[cur_arg], "minconn")) {
|
||||||
newsrv->minconn = atol(args[cur_arg + 1]);
|
newsrv->minconn = atol(args[cur_arg + 1]);
|
||||||
cur_arg += 2;
|
cur_arg += 2;
|
||||||
|
157
src/sink.c
157
src/sink.c
@ -470,6 +470,150 @@ static void sink_forward_io_handler(struct appctx *appctx)
|
|||||||
si_ic(si)->flags |= CF_READ_NULL;
|
si_ic(si)->flags |= CF_READ_NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IO Handler to handle message push to syslog tcp server
|
||||||
|
* using octet counting frames
|
||||||
|
*/
|
||||||
|
static void sink_forward_oc_io_handler(struct appctx *appctx)
|
||||||
|
{
|
||||||
|
struct stream_interface *si = appctx->owner;
|
||||||
|
struct stream *s = si_strm(si);
|
||||||
|
struct sink *sink = strm_fe(s)->parent;
|
||||||
|
struct sink_forward_target *sft = appctx->ctx.sft.ptr;
|
||||||
|
struct ring *ring = sink->ctx.ring;
|
||||||
|
struct buffer *buf = &ring->buf;
|
||||||
|
uint64_t msg_len;
|
||||||
|
size_t len, cnt, ofs;
|
||||||
|
int ret = 0;
|
||||||
|
char *p;
|
||||||
|
|
||||||
|
/* if stopping was requested, close immediatly */
|
||||||
|
if (unlikely(stopping))
|
||||||
|
goto close;
|
||||||
|
|
||||||
|
/* for rex because it seems reset to timeout
|
||||||
|
* and we don't want expire on this case
|
||||||
|
* with a syslog server
|
||||||
|
*/
|
||||||
|
si_oc(si)->rex = TICK_ETERNITY;
|
||||||
|
/* rto should not change but it seems the case */
|
||||||
|
si_oc(si)->rto = TICK_ETERNITY;
|
||||||
|
|
||||||
|
/* an error was detected */
|
||||||
|
if (unlikely(si_ic(si)->flags & (CF_WRITE_ERROR|CF_SHUTW)))
|
||||||
|
goto close;
|
||||||
|
|
||||||
|
/* con closed by server side */
|
||||||
|
if ((si_oc(si)->flags & CF_SHUTW))
|
||||||
|
goto close;
|
||||||
|
|
||||||
|
/* if the connection is not established, inform the stream that we want
|
||||||
|
* to be notified whenever the connection completes.
|
||||||
|
*/
|
||||||
|
if (si_opposite(si)->state < SI_ST_EST) {
|
||||||
|
si_cant_get(si);
|
||||||
|
si_rx_conn_blk(si);
|
||||||
|
si_rx_endp_more(si);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
HA_SPIN_LOCK(SFT_LOCK, &sft->lock);
|
||||||
|
if (appctx != sft->appctx) {
|
||||||
|
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
|
||||||
|
goto close;
|
||||||
|
}
|
||||||
|
ofs = sft->ofs;
|
||||||
|
|
||||||
|
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
LIST_DEL_INIT(&appctx->wait_entry);
|
||||||
|
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
|
||||||
|
HA_RWLOCK_RDLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
|
||||||
|
/* explanation for the initialization below: it would be better to do
|
||||||
|
* this in the parsing function but this would occasionally result in
|
||||||
|
* dropped events because we'd take a reference on the oldest message
|
||||||
|
* and keep it while being scheduled. Thus instead let's take it the
|
||||||
|
* first time we enter here so that we have a chance to pass many
|
||||||
|
* existing messages before grabbing a reference to a location. This
|
||||||
|
* value cannot be produced after initialization.
|
||||||
|
*/
|
||||||
|
if (unlikely(ofs == ~0)) {
|
||||||
|
ofs = 0;
|
||||||
|
|
||||||
|
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
|
||||||
|
ofs += ring->ofs;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we were already there, adjust the offset to be relative to
|
||||||
|
* the buffer's head and remove us from the counter.
|
||||||
|
*/
|
||||||
|
ofs -= ring->ofs;
|
||||||
|
BUG_ON(ofs >= buf->size);
|
||||||
|
HA_ATOMIC_SUB(b_peek(buf, ofs), 1);
|
||||||
|
|
||||||
|
/* in this loop, ofs always points to the counter byte that precedes
|
||||||
|
* the message so that we can take our reference there if we have to
|
||||||
|
* stop before the end (ret=0).
|
||||||
|
*/
|
||||||
|
if (si_opposite(si)->state == SI_ST_EST) {
|
||||||
|
ret = 1;
|
||||||
|
while (ofs + 1 < b_data(buf)) {
|
||||||
|
cnt = 1;
|
||||||
|
len = b_peek_varint(buf, ofs + cnt, &msg_len);
|
||||||
|
if (!len)
|
||||||
|
break;
|
||||||
|
cnt += len;
|
||||||
|
BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
|
||||||
|
|
||||||
|
chunk_reset(&trash);
|
||||||
|
p = ulltoa(msg_len, trash.area, b_size(&trash));
|
||||||
|
if (p) {
|
||||||
|
trash.data = (p - trash.area) + 1;
|
||||||
|
*p = ' ';
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!p || (trash.data + msg_len > b_size(&trash))) {
|
||||||
|
/* too large a message to ever fit, let's skip it */
|
||||||
|
ofs += cnt + msg_len;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
trash.data += b_getblk(buf, p + 1, msg_len, ofs + cnt);
|
||||||
|
|
||||||
|
if (ci_putchk(si_ic(si), &trash) == -1) {
|
||||||
|
si_rx_room_blk(si);
|
||||||
|
ret = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ofs += cnt + msg_len;
|
||||||
|
}
|
||||||
|
|
||||||
|
HA_ATOMIC_ADD(b_peek(buf, ofs), 1);
|
||||||
|
ofs += ring->ofs;
|
||||||
|
sft->ofs = ofs;
|
||||||
|
}
|
||||||
|
HA_RWLOCK_RDUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
|
||||||
|
if (ret) {
|
||||||
|
/* let's be woken up once new data arrive */
|
||||||
|
HA_RWLOCK_WRLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
LIST_ADDQ(&ring->waiters, &appctx->wait_entry);
|
||||||
|
HA_RWLOCK_WRUNLOCK(LOGSRV_LOCK, &ring->lock);
|
||||||
|
si_rx_endp_done(si);
|
||||||
|
}
|
||||||
|
HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
|
||||||
|
|
||||||
|
/* always drain data from server */
|
||||||
|
co_skip(si_oc(si), si_oc(si)->output);
|
||||||
|
return;
|
||||||
|
|
||||||
|
close:
|
||||||
|
si_shutw(si);
|
||||||
|
si_shutr(si);
|
||||||
|
si_ic(si)->flags |= CF_READ_NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void __sink_forward_session_deinit(struct sink_forward_target *sft)
|
void __sink_forward_session_deinit(struct sink_forward_target *sft)
|
||||||
{
|
{
|
||||||
struct stream_interface *si;
|
struct stream_interface *si;
|
||||||
@ -520,6 +664,13 @@ static struct applet sink_forward_applet = {
|
|||||||
.release = sink_forward_session_release,
|
.release = sink_forward_session_release,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static struct applet sink_forward_oc_applet = {
|
||||||
|
.obj_type = OBJ_TYPE_APPLET,
|
||||||
|
.name = "<SINKFWDOC>", /* used for logging */
|
||||||
|
.fct = sink_forward_oc_io_handler,
|
||||||
|
.release = sink_forward_session_release,
|
||||||
|
};
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create a new peer session in assigned state (connect will start automatically)
|
* Create a new peer session in assigned state (connect will start automatically)
|
||||||
*/
|
*/
|
||||||
@ -529,8 +680,12 @@ static struct appctx *sink_forward_session_create(struct sink *sink, struct sink
|
|||||||
struct appctx *appctx;
|
struct appctx *appctx;
|
||||||
struct session *sess;
|
struct session *sess;
|
||||||
struct stream *s;
|
struct stream *s;
|
||||||
|
struct applet *applet = &sink_forward_applet;
|
||||||
|
|
||||||
appctx = appctx_new(&sink_forward_applet, tid_bit);
|
if (sft->srv->log_proto == SRV_LOG_PROTO_OCTET_COUNTING)
|
||||||
|
applet = &sink_forward_oc_applet;
|
||||||
|
|
||||||
|
appctx = appctx_new(applet, tid_bit);
|
||||||
if (!appctx)
|
if (!appctx)
|
||||||
goto out_close;
|
goto out_close;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user