diff --git a/Makefile b/Makefile index b2b748c51..b68d2c596 100644 --- a/Makefile +++ b/Makefile @@ -144,8 +144,8 @@ SMALL_OPTS = #### Debug settings # You can enable debugging on specific code parts by setting DEBUG=-DDEBUG_xxx. # Currently defined DEBUG macros include DEBUG_FULL, DEBUG_MEMORY, DEBUG_FSM, -# DEBUG_HASH and DEBUG_AUTH. Please check sources for exact meaning or do not -# use at all. +# DEBUG_HASH, DEBUG_AUTH and DEBUG_SPOE. Please check sources for exact meaning +# or do not use at all. DEBUG = #### Trace options @@ -778,7 +778,7 @@ OBJS = src/haproxy.o src/base64.o src/protocol.o \ src/acl.o src/sample.o src/memory.o src/freq_ctr.o src/auth.o src/proto_udp.o \ src/compression.o src/payload.o src/hash.o src/pattern.o src/map.o \ src/namespace.o src/mailers.o src/dns.o src/vars.o src/filters.o \ - src/flt_http_comp.o src/flt_trace.o + src/flt_http_comp.o src/flt_trace.o src/flt_spoe.o EBTREE_OBJS = $(EBTREE_DIR)/ebtree.o \ $(EBTREE_DIR)/eb32tree.o $(EBTREE_DIR)/eb64tree.o \ diff --git a/doc/SPOE.txt b/doc/SPOE.txt new file mode 100644 index 000000000..538bb2684 --- /dev/null +++ b/doc/SPOE.txt @@ -0,0 +1,829 @@ + ----------------------------------------------- + Stream Processing Offload Engine (SPOE) + Version 1.0 + ( Last update: 2016-11-07 ) + ----------------------------------------------- + Author : Christopher Faulet + Contact : cfaulet at haproxy dot com + + +SUMMARY +-------- + + 0. Terms + 1. Introduction + 2. SPOE configuration + 2.1. SPOE scope + 2.2. "spoe-agent" section + 2.3. "spoe-message" section + 2.4. Example + 3. SPOP specification + 3.1. Data types + 3.2. Frames + 3.2.1. Frame capabilities + 3.2.2. Frame types overview + 3.2.3. Workflow + 3.2.4. Frame: HAPROXY-HELLO + 3.2.5. Frame: AGENT-HELLO + 3.2.6. Frame: NOTIFY + 3.2.7. Frame: ACK + 3.2.8. Frame: HAPROXY-DISCONNECT + 3.2.9. Frame: AGENT-DISCONNECT + 3.3. Events & messages + 3.4. Actions + 3.5. Error & timeouts + + +0. Terms +--------- + +* SPOE : Stream Processing Offload Engine. + + A SPOE is a filter talking to servers managed ba a SPOA to offload the + stream processing. An engine is attached to a proxy. A proxy can have + several engine. Each engine is linked to an agent and only one. + +* SPOA : Stream Processing Offload Agent. + + A SPOA is a service that will receive info from a SPOE to offload the + stream processing. An agent manages several servers. It uses a backend to + reference all of them. By extension, these servers can also be called + agents. + +* SPOP : Stream Processing Offload Protocol, used by SPOEs to talk to SPOA + servers. + + This protocol is used by engines to talk to agents. It is an in-house + binary protocol described in this documentation. + + +1. Introduction +---------------- + +SPOE is a feature introduced in HAProxy 1.7. It makes possible the +communication with external components to retrieve some info. The idea started +with the problems caused by most ldap libs not working fine in event-driven +systems (often at least the connect() is blocking). So, it is hard to properly +implement Single Sign On solution (SSO) in HAProxy. The SPOE will ease this +kind of processing, or we hope so. + +Now, the aim of SPOE is to allow any kind of offloading on the streams. First +releases, besides being experimental, won't do lot of things. As we will see, +there are few handled events and even less actions supported. Actually, for +now, the SPOE can offload the processing before "tcp-request content", +"tcp-response content", "http-request" and "http-response" rules. And it only +supports variables definition. But, in spite of these limited features, we can +easily imagine to implement SSO solution, ip reputation or ip geolocation +services. + + +2. SPOE configuration +---------------------- + +Because SPOE is implemented as a filter, To use it, you must declare a "filter +spoe" line in a proxy section (frontend/backend/listen) : + + frontend my-front + ... + filter spoe [engine ] config + ... + +The "config" parameter is mandatory. It specififies the SPOE configuration +file. The engine name is optional. It can be set to declare the scope to use in +the SPOE configuration. So it is possible to use the same SPOE configuration +for several engines. If no name is provided, the SPOE configuration must not +contain any scope directive. + +We use a separate configuration file on purpose. By commenting SPOE filter +line, you completly disable the feature, including the parsing of sections +reserved to SPOE. This is also a way to keep the HAProxy configuration clean. + +A SPOE configuration file must contains, at least, the SPOA configuration +("spoe-agent" section) and SPOE messages ("spoe-message" section) attached to +this agent. Unused messages (not reference in "spoe-agent" section) will be +ignored. + +IMPORTANT : The configuration of a SPOE filter must be located in a dedicated +file. But the backend used by a SPOA must be declared in HAProxy configuration +file. + +2.1. SPOE scope +------------------------- + +If you specify an engine name on the SPOE filter line, then you need to define +scope in the SPOE configuration with the same name. You can have several SPOE +scope in the same file. In each scope, you must define one and only one +"spoe-agent" section to configure the SPOA linked to your SPOE and several +"spoe-message" sections to describe messages sent to servers mananger by your +SPOA. + +A SPOE scope starts with this kind of line : + + [] + +where is the same engine name specified on the SPOE filter line. The +scope ends when the file ends or when another scope is found. + + Example : + [my-first-engine] + spoe-agent my-agent + ... + spoe-message msg1 + ... + spoe-message msg2 + ... + + [my-second-engine] + ... + +If no engine name is provided on the SPOE filter line, no SPOE scope must be +found in the SPOE configuration file. All the file is considered to be in the +same anonymous and implicit scope. + +2.2. "spoe-agent" section +-------------------------- + +For each engine, you must define one and only one "spoe-agent" section. In this +section, you will declare SPOE messages and the backend you will use. You will +also set timeouts and options to customize your agent's behaviour. + + +spoe-agent + Create a new SPOA with the name . It must have one and only one + "spoe-agent" definition by SPOE scope. + + Arguments : + is the name of the agent section. + + following keywords are supported : + - messages + - option var-prefix + - timeout hello|idle|ack + - use-backend + + +messages ... + Declare the list of SPOE messages that an agent will handle. + + Arguments : + is the name of a SPOE message. + + Messages declared here must be found in the same engine scope, else an error + is triggered during the configuration parsing. You can have many "messages" + lines. + + See also: "spoe-message" section. + + +option var-prefix + Define the prefix used when variables are set by an agent. + + Arguments : + + is the prefix used to limit the scope of variables set by an + agent. + + To avoid conflict with other variables defined by HAProxy, all variables + names will be prefixed. By default, the "spoe-agent" name is used. This + option can be used to customize it. + + The prefix will be added between the variable scope and its name, separated + by a '.'. It may only contain characters 'a-z', 'A-Z', '0-9', '.' and '_', as + for variables name. In HAProxy configuration, you need to use this prefix as + a part of the variables name. For example, if an agent define the variable + "myvar" in the "txn" scope, with the prefix "my_spoe_pfx", then you should + use "txn.my_spoe_pfx.myvar" name in your HAProxy configuration. + + An agent will never set new variables at runtime. It can only set new value + for existing ones. + + +timeout ack + Set the maximum time to wait for an agent to receive the acknowledgement to a + NOTIFY frame. + + Arguments : + is the timeout value specified in milliseconds by default, but + can be in any other unit if the number is suffixed by the unit, + as explained at the top of this document. + + +timeout hello + Set the maximum time to wait for an agent to receive the AGENT-HELLO frame. + + Arguments : + is the timeout value specified in milliseconds by default, but + can be in any other unit if the number is suffixed by the unit, + as explained at the top of this document. + + This timeout is an applicative timeout. It differ from "timeout connect" + defined on backends. + + +timeout idle + Set the maximum time to wait for an agent to close an idle connection. + + Arguments : + is the timeout value specified in milliseconds by default, but + can be in any other unit if the number is suffixed by the unit, + as explained at the top of this document. + + +use-backend + Specify the backend to use. It must be defined. + + Arguments : + is the name of a valid "backend" section. + + +2.3. "spoe-message" section +---------------------------- + +To offload the stream processing, SPOE will send messages with specific +information at a specific moment in the stream life and will wait for +corresponding replies to know what to do. + + +spoe-message + Create a new SPOE message with the name . + + Arguments : + is the name of the SPOE message. + + Here you define a message that can be referenced in a "spoe-agent" + section. Following keywords are supported : + - args + - event + + See also: "spoe-agent" section. + + +args [name=] ... + Define arguments passed into the SPOE message. + + Arguments : + is a sample expression. + + When the message is processed, if a sample expression is not available, it is + set to NULL. Arguments are processed in their declaration order and added in + the message in that order. It is possible to declare named arguements. + + For example: + args frontend=fe_id src dst + + +event + Set the event that triggers sending of the message. + + Argument : + is the event name. + + Supported events are: + - on-client-session + - on-server-connectiob + - on-frontend-tcp-request + - on-backend-tcp-request + - on-tcp-response + - on-frontend-http-request + - on-backend-http-request + - on-http-response + + See section 3.5 about Events. + +2.4. Example +------------- + +Here is a simple but complete example that sends client-ip address to a ip +reputation service. This service can set the variable "ip_score" which is an +integer between 0 and 100, indicating its reputation (100 means totally safe +and 0 a blacklisted IP with no doubt). + + ### + ### HAProxy configuration + frontend www + mode http + bind *:80 + + filter spoe engine ip-reputation config spoe-ip-reputation.conf + + # Reject connection if the IP reputation is under 20 + tcp-request content reject if { var(sess.iprep.ip_score) -m int lt 20 } + + default_backend http-servers + + backend http-servers + mode http + server http A.B.C.D:80 + + backend iprep-servers + mode tcp + balance roundrobin + + timeout connect 5s # greater than hello timeout + timeout server 3m # greater than idle timeout + + server iprep1 A1.B1.C1.D1:12345 + server iprep2 A2.B2.C2.D2:12345 + + #### + ### spoe-ip-reputation.conf + [ip-reputation] + + spoe-agent iprep-agent + messages get-ip-reputation + + option var-prefix iprep + + timeout hello 2s + timeout ack 10ms + timeout idle 2m + + use-backend iprep-servers + + spoe-message get-ip-reputation + args ip=src + event on-client-session + + +3. SPOP specification +---------------------- + +3.1. Data types +---------------- + +Here is the bytewise representation of typed data: + + TYPED-DATA : + +Supported types and their representation are: + + TYPE | ID | DESCRIPTION + -----------------------------+-----+---------------------------------- + NULL | 0 | NULL : <0> + Boolean | 1 | BOOL : <1+FLAG> + 32bits signed integer | 2 | INT32 : <2> + 32bits unsigned integer | 3 | UINT32 : <3> + 64bits signed integer | 4 | INT64 : <4> + 32bits unsigned integer | 5 | UNIT64 : <5> + IPV4 | 6 | IPV4 : <6> + IPV6 | 7 | IPV6 : <7> + String | 8 | STRING : <8> + Binary | 9 | BINARY : <9> + 10 -> 15 unused/reserved | - | - + -----------------------------+-----+---------------------------------- + +Variable-length integer (varint) are encoded using Peers encoding: + + + 0 <= X < 240 : 1 byte (7.875 bits) [ XXXX XXXX ] + 240 <= X < 2288 : 2 bytes (11 bits) [ 1111 XXXX ] [ 0XXX XXXX ] + 2288 <= X < 264432 : 3 bytes (18 bits) [ 1111 XXXX ] [ 1XXX XXXX ] [ 0XXX XXXX ] + 264432 <= X < 33818864 : 4 bytes (25 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*2 [ 0XXX XXXX ] + 33818864 <= X < 4328786160 : 5 bytes (32 bits) [ 1111 XXXX ] [ 1XXX XXXX ]*3 [ 0XXX XXXX ] + ... + +For booleans, the value (true or false) is the first bit in the FLAGS +bitfield. if this bit is set to 0, then the boolean is evaluated as false, +otherwise, the boolean is evaluated as true. + +3.2. Frames +------------ + +Exchange between HAProxy and agents are made using FRAME packets. All frames +must be prefixed with their size encoded on 4 bytes in network byte order: + + + +A frame always starts with its type, on one byte, followed by metadata +containing flags, on 4 bytes and a two variable-length integer representing the +stream identifier and the frame identifier inside the stream: + + FRAME : + METADATA : + +Then comes the frame payload. Depending on the frame type, the payload can be +of three types: a simple key/value list, a list of messages or a list of +actions. + + FRAME-PAYLOAD : | | + + LIST-OF-MESSAGES : [ ... ] + MESSAGE-NAME : + + LIST-OF-ACTIONS : [ ... ] + ACTION-ARGS : [ ... ] + + KV-LIST : [ ... ] + KV-NAME : + KV-VALUE : + + FLAGS : 0 1-31 + +---+-----------+ + | F| | + | I| RESERVED | + | N| | + +--+------------+ + + FIN: Indicates that this is the final payload fragment. The first fragment + may also be the final fragment. + +Frames cannot exceed a maximum size negociated between HAProxy and agents +during the HELLO handshake. Most of time, payload will be small enough to send +it in one frame. But when supported by the peer, it will be possible to +fragment huge payload on many frames. This ability is announced during the +HELLO handshake and it can be asynmetric (supported by agents but not by +HAProxy or the opposite). The following rules apply to fragmentation: + + * An unfragemnted payload consists of a single frame with the FIN bit set. + + * A fragemented payload consists of several frames with the FIN bit clear and + terminated by a single frame with the FIN bit set. All these frames must + share the same STREAM-ID and FRAME-ID. And, of course, the FRAME-TYPE must + be the same. + +Beside the support of fragmented payload by a peer, some payload must not be +fragmented. See below for details. + +IMPORTANT : The maximum size supported by peers for a frame must be greater or +equal to 256 bytes. + +3.2.1. Frame capabilities +-------------------------- + +Here are the list of official capabilities that HAProxy and agents can support: + + * fragmentation: This is the abaility for a peer to support fragmented + payload in received frames. + +Unsupported or unknown capabilities are silently ignored, when possible. + +3.2.2. Frame types overview +---------------------------- + +Here are types of frame supported by SPOE. Frames sent by HAProxy come first, +then frames sent by agents : + + TYPE | ID | DESCRIPTION + -----------------------------+-----+------------------------------------- + HAPROXY-HELLO | 1 | Sent by HAProxy when it opens a + | | connection on an agent. + | | + HAPROXY-DISCONNECT | 2 | Sent by HAProxy when it want to close + | | the connection or in reply to an + | | AGENT-DISCONNECT frame + | | + NOTIFY | 3 | Sent by HAProxy to pass information + | | to an agent + -----------------------------+-----+------------------------------------- + AGENT-HELLO | 101 | Reply to a HAPROXY-HELLO frame, when + | | the connection is established + | | + AGENT-DISCONNECT | 102 | Sent by an agent just before closing + | | the connection + | | + ACK | 103 | Sent to acknowledge a NOTIFY frame + -----------------------------+-----+------------------------------------- + +Unknown frames may be silently skipped. + +3.2.3. Workflow +---------------- + + * Successful HELLO handshake: + + HAPROXY AGENT SRV + | HAPROXY-HELLO | + | --------------------------> | + | | + | AGENT-HELLO | + | <-------------------------- | + | | + + + * Error encountered by agent during the HELLO handshake: + + HAPROXY AGENT SRV + | HAPROXY-HELLO | + | --------------------------> | + | | + | DISCONNECT + close() | + | <-------------------------- | + | | + + * Error encountered by HAProxy during the HELLO handshake: + + HAPROXY AGENT SRV + | HAPROXY-HELLO | + | --------------------------> | + | | + | AGENT-HELLO | + | <-------------------------- | + | | + | DISCONNECT | + | --------------------------> | + | | + | DISCONNECT + close() | + | <-------------------------- | + | | + + * Notify / Ack exchange: + + HAPROXY AGENT SRV + | NOTIFY | + | --------------------------> | + | | + | ACK | + | <-------------------------- | + | | + + * Connection closed by haproxy: + + HAPROXY AGENT SRV + | DISCONNECT | + | --------------------------> | + | | + | DISCONNECT + close() | + | <-------------------------- | + | | + + * Connection closed by agent: + + HAPROXY AGENT SRV + | DISCONNECT + close() | + | <-------------------------- | + | | + +3.2.4. Frame: HAPROXY-HELLO +---------------------------- + +This frame is the first one exchanged between HAProxy and an agent, when the +connection is established. The payload of this frame is a KV-LIST. It cannot be +fragmented. STREAM-ID and FRAME-ID are must be set 0. + +Following items are mandatory in the KV-LIST: + + * "supported-versions" + + Last SPOP major versions supported by HAProxy. It is a comma-separated list + of versions, following the format "Major.Minor". Spaces must be ignored, if + any. When a major version is announced by HAProxy, it means it also support + all previous minor versions. + + Example: "2.0, 1.5" means HAProxy supports SPOP 2.0 and 1.0 to 1.5 + + * "max-frame-size" + + This is the maximum size allowed for a frame. The HAPROXY-HELLO frame must + be lower or equal to this value. + + * "capabilities" + + This a comma-separated list of capabilities supported by HAProxy. Spaces + must be ignored, if any. + +To finish the HELLO handshake, the agent must return an AGENT-HELLO frame with +its supported SPOP version, the lower value between its maximum size allowed +for a frame and the HAProxy one and capabilities it supports. If an error +occurs or if an incompatibility is detected with the agent configuration, an +AGENT-DISCONNECT frame must be returned. + +3.2.5. Frame: AGENT-HELLO +-------------------------- + +This frame is sent in reply to a HAPROXY-HELLO frame to finish a HELLO +handshake. As for HAPROXY-HELLO frame, STREAM-ID and FRAME-ID are also set +0. The payload of this frame is a KV-LIST and it cannot be fragmented. + +Following items are mandatory in the KV-LIST: + + * "version" + + This is the SPOP version the agent supports. It must follow the format + "Major.Minor" and it must be lower or equal than one of major versions + announced by HAProxy. + + * "max-frame-size" + + This is the maximum size allowed for a frame. It must be lower or equal to + the value in the HAPROXY-HELLO frame. This value will be used for all + subsequent frames. + + * "capabilities" + + This a comma-separated list of capabilities supported by agent. Spaces must + be ignored, if any. + +At this time, if everything is ok for HAProxy (supported version and valid +max-frame-size value), the HELLO handshake is successfully completed. Else, +HAProxy sends a HAPROXY-DISCONNECT frame with the corresponding error. + +3.2.6. Frame: NOTIFY +--------------------- + +Information are sent to the agents inside NOTIFY frames. These frames are +attached to a stream, so STREAM-ID and FRAME-ID must be set. The payload of +NOTIFY frames is a LIST-OF-MESSAGES and, if supported by agents, it can be +fragmented. + +NOTIFY frames must be acknowledge by agents sending an ACK frame, repeating +right STREAM-ID and FRAME-ID. + +3.2.7. Frame: ACK +------------------ + +ACK frames must be sent by agents to reply to NOTIFY frames. STREAM-ID and +FRAME-ID found in a NOTIFY frame must be reuse in the corresponding ACK +frame. The payload of ACK frames is a LIST-OF-ACTIONS and, if supported by +HAProxy, it can be fragmented. + +3.2.8. Frame: HAPROXY-DISCONNECT +--------------------------------- + +If an error occurs, at anytime, from the HAProxy side, a HAPROXY-DISCONNECT +frame is sent with information describing the error. HAProxy will wait an +AGENT-DISCONNECT frame in reply. All other frames will be ignored. The agent +must then close the socket. + +The payload of this frame is a KV-LIST. It cannot be fragmented. STREAM-ID and +FRAME-ID are must be set 0. + +Following items are mandatory in the KV-LIST: + + * "status-code" + + This is the code corresponding to the error. + + * "message" + + This is a textual message describing the error. + +For more information about known errors, see section "Errors & timeouts" + +3.2.9. Frame: AGENT-DISCONNECT +------------------------------- + +If an error occurs, at anytime, from the agent size, a AGENT-DISCONNECT frame +is sent, with information desribing the error. such frame is also sent in reply +to a HAPROXY-DISCONNECT. The agent must close the socket just after sending +this frame. + +The payload of this frame is a KV-LIST. It cannot be fragmented. STREAM-ID and +FRAME-ID are must be set 0. + +Following items are mandatory in the KV-LIST: + + * "status-code" + + This is the code corresponding to the error. + + * "message" + + This is a textual message describing the error. + +For more information about known errors, see section "Errors & timeouts" + +3.3. Events & Messages +----------------------- + +Information about streams are sent in NOTIFY frames. You can specify which kind +of information to send by defining "spoe-message" sections in your SPOE +configuration file. for each "spoe-message" there will be a message in a NOTIFY +frame when the right event is triggered. + +A NOTIFY frame is sent for an specific event when there is at least one +"spoe-message" attached to this event. All messages for an event will be added +in the same NOTIFY frame. + +Here is the list of supported events: + + * on-client-session is triggered when a new client session is created. + This event is only available for SPOE filters + declared in a frontend or a listen section. + + * on-frontend-tcp-request is triggered just before the evaluation of + "tcp-request content" rules on the frontend side. + This event is only available for SPOE filters + declared in a frontend or a listen section. + + * on-backend-tcp-request is triggered just before the evaluation of + "tcp-request content" rules on the backend side. + This event is skipped for SPOE filters declared + in a listen section. + + * on-frontend-http-request is triggered just before the evaluation of + "http-request" rules on the frontend side. This + event is only available for SPOE filters declared + in a frontend or a listen section. + + * on-backend-http-request is triggered just before the evaluation of + "http-request" rules on the backend side. This + event is skipped for SPOE filters declared in a + listen section. + + * on-server-session is triggered when the session with the server is + established. + + * on-tcp-response is triggered just before the evaluation of + "tcp-response content" rules. + + * on-http-response is triggered just before the evaluation of + "http-response" rules. + + +The stream processing will loop on these events, when triggered, waiting the +agent reply. + +3.4. Actions +------------- + +An agent must acknowledge each NOTIFY frame by sending the corresponding ACK +frame. Actions can be added in these frames to dynamically take action on the +processing of a stream. + +Here is the list of supported actions: + + * set-var set the value for an existing variable. 3 arguments must be + attached to this action: the variable scope (proc, sess, txn, + req or req), the variable name (a string) and its value. + + ACTION-SET-VAR : + + SET-VAR : <1> + NB-ARGS : <3> + VAR-SCOPE : | | | | + VAR-NAME : + VAR-VALUE : + + PROCESS : <0> + SESSION : <1> + TRANSACTION : <2> + REQUEST : <3> + RESERVED : <4> + + * unset-var unset the value for an existing variable. 2 arguments must be + attached to this action: the variable scope (proc, sess, txn, + req or req) and the variable name (a string). + + ACTION-UNSET-VAR : + + SET-VAR : <1> + NB-ARGS : <3> + VAR-SCOPE : | | | | + VAR-NAME : + + PROCESS : <0> + SESSION : <1> + TRANSACTION : <2> + REQUEST : <3> + RESERVED : <4> + + +NOTE: Name of the variables will be automatically prefixed by HAProxy to avoid + name clashes with other variables used in HAProxy. Moreover, unknown + variable will be silently ignored. + +3.5. Error & timeouts +---------------------- + +Here is the list of all known errors: + + STATUS CODE | DESCRIPTION + ----------------+-------------------------------------------------------- + 0 | normal (no error occurred) + 1 | I/O error + 2 | A timeout occurred + 3 | frame is too big + 4 | invalid frame received + 5 | version value not found + 6 | max-frame-size value not found + 7 | capabilities value not found + 8 | unsupported version + 9 | max-frame-size too big or too small + 99 | an unknown error occurrde + ----------------+-------------------------------------------------------- + +An agent can define its own errors using a not yet assigned status code. + +IMPORTANT NOTE: For a specific stream, when an abnormal/unexpected error + occurs, the SPOE is disabled for all the transaction. So if you + have several events configured, such error on an event will + disabled all followings. For TCP streams, this will disable the + SPOE for the whole session. For HTTP streams, this will disable + it for the transaction (request and response). + +To avoid a stream to wait infinitly, you must carefully choose the +acknowledgement timeout. In most of cases, it will be quiet low. But it depends +on the responsivness of your service. + +You must also choose idle timeout carefully. Because connection with your +service depends on the backend configuration used by the SPOA, it is important +to use a lower value for idle timeout than the server timeout. Else the +connection will be closed by HAProxy. The same is true for hello timeout. You +should choose a lower value than the connect timeout. + + +/* + * Local variables: + * fill-column: 79 + * End: + */ diff --git a/doc/configuration.txt b/doc/configuration.txt index 2a6183cfc..313965052 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -106,6 +106,7 @@ Summary 9. Supported filters 9.1. Trace 9.2. HTTP compression +9.3. Stream Processing Offload Engine (SPOE) 1. Quick reminder about HTTP @@ -16182,6 +16183,37 @@ filters evaluation order. See also : "compression" +9.3. Stream Processing Offload Engine (SPOE) +-------------------------------------------- + +filter spoe [engine ] config + + Arguments : + + is the engine name that will be used to find the right scope in + the configuration file. If not provided, all the file will be + parsed. + + is the path of the engine configuration file. This file can + contain configuration of several engines. In this case, each + part must be placed in its own scope. + +The Stream Processing Offload Engine (SPOE) is a filter communicating with +external components. It allows the offload of some specifics processing on the +streams in tierce applications. These external components and information +exchanged with them are configured in dedicated files, for the main part. It +also requires dedicated backends, defined in HAProxy configuration. + +SPOE communicates with external components using an in-house binary protocol, +the Stream Processing Offload Protocol (SPOP). + +For all information about the SPOE configuation and the SPOP specification, see +"doc/SPOE.txt". + +Important note: + The SPOE filter is highly experimental for now and was not heavily + tested. It is really not production ready. So use it carefully. + /* * Local variables: * fill-column: 79 diff --git a/include/types/applet.h b/include/types/applet.h index 91d7d8fec..1f094e6ba 100644 --- a/include/types/applet.h +++ b/include/types/applet.h @@ -145,6 +145,14 @@ struct appctx { struct { char **var; } env; + struct { + struct task *task; + void *ctx; + void *agent; + unsigned int version; + unsigned int max_frame_size; + struct list list; + } spoe; /* used by SPOE filter */ } ctx; /* used by stats I/O handlers to dump the stats */ }; diff --git a/include/types/arg.h b/include/types/arg.h index fef12ec55..7576f8ac3 100644 --- a/include/types/arg.h +++ b/include/types/arg.h @@ -76,6 +76,7 @@ enum { ARGC_RDR, /* redirect */ ARGC_CAP, /* capture rule */ ARGC_SRV, /* server line */ + ARGC_SPOE, /* spoe message args */ }; /* flags used when compiling and executing regex */ diff --git a/src/flt_spoe.c b/src/flt_spoe.c new file mode 100644 index 000000000..1ebdbdaf0 --- /dev/null +++ b/src/flt_spoe.c @@ -0,0 +1,3013 @@ +/* + * Stream processing offload engine management. + * + * Copyright 2016 HAProxy Technologies, Christopher Faulet + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + */ +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(DEBUG_SPOE) || defined(DEBUG_FULL) +#define SPOE_PRINTF(x...) fprintf(x) +#else +#define SPOE_PRINTF(x...) +#endif + +/* Helper to get ctx inside an appctx */ +#define APPCTX_SPOE(appctx) ((appctx)->ctx.spoe) + +/* TODO: add an option to customize these values */ +/* The maximum number of new applet waiting the end of the hello handshake */ +#define MAX_NEW_SPOE_APPLETS 5 + +/* The maximum number of error when a stream is waiting of a SPOE applet */ +#define MAX_NEW_SPOE_APPLET_ERRS 3 + +/* Minimal size for a frame */ +#define MIN_FRAME_SIZE 256 + +/* Flags set on the SPOE context */ +#define SPOE_CTX_FL_CLI_CONNECTED 0x00000001 /* Set after that on-client-session event was processed */ +#define SPOE_CTX_FL_SRV_CONNECTED 0x00000002 /* Set after that on-server-session event was processed */ +#define SPOE_CTX_FL_REQ_PROCESS 0x00000004 /* Set when SPOE is processing the request */ +#define SPOE_CTX_FL_RSP_PROCESS 0x00000008 /* Set when SPOE is processing the response */ + +#define SPOE_CTX_FL_PROCESS (SPOE_CTX_FL_REQ_PROCESS|SPOE_CTX_FL_RSP_PROCESS) + +#define SPOE_APPCTX_ERR_NONE 0x00000000 /* no error yet, leave it to zero */ +#define SPOE_APPCTX_ERR_TOUT 0x00000001 /* SPOE applet timeout */ + +/* All possible states for a SPOE context */ +enum spoe_ctx_state { + SPOE_CTX_ST_NONE = 0, + SPOE_CTX_ST_READY, + SPOE_CTX_ST_SENDING_MSGS, + SPOE_CTX_ST_WAITING_ACK, + SPOE_CTX_ST_DONE, + SPOE_CTX_ST_ERROR, +}; + +/* All possible states for a SPOE applet */ +enum spoe_appctx_state { + SPOE_APPCTX_ST_CONNECT = 0, + SPOE_APPCTX_ST_CONNECTING, + SPOE_APPCTX_ST_PROCESSING, + SPOE_APPCTX_ST_DISCONNECT, + SPOE_APPCTX_ST_DISCONNECTING, + SPOE_APPCTX_ST_EXIT, + SPOE_APPCTX_ST_END, +}; + +/* All supported SPOE actions */ +enum spoe_action_type { + SPOE_ACT_T_SET_VAR = 1, + SPOE_ACT_T_UNSET_VAR, + SPOE_ACT_TYPES, +}; + +/* All supported SPOE events */ +enum spoe_event { + SPOE_EV_NONE = 0, + + /* Request events */ + SPOE_EV_ON_CLIENT_SESS = 1, + SPOE_EV_ON_TCP_REQ_FE, + SPOE_EV_ON_TCP_REQ_BE, + SPOE_EV_ON_HTTP_REQ_FE, + SPOE_EV_ON_HTTP_REQ_BE, + + /* Response events */ + SPOE_EV_ON_SERVER_SESS, + SPOE_EV_ON_TCP_RSP, + SPOE_EV_ON_HTTP_RSP, + + SPOE_EV_EVENTS +}; + +/* Errors triggerd by SPOE applet */ +enum spoe_frame_error { + SPOE_FRM_ERR_NONE = 0, + SPOE_FRM_ERR_IO, + SPOE_FRM_ERR_TOUT, + SPOE_FRM_ERR_TOO_BIG, + SPOE_FRM_ERR_INVALID, + SPOE_FRM_ERR_NO_VSN, + SPOE_FRM_ERR_NO_FRAME_SIZE, + SPOE_FRM_ERR_NO_CAP, + SPOE_FRM_ERR_BAD_VSN, + SPOE_FRM_ERR_BAD_FRAME_SIZE, + SPOE_FRM_ERR_UNKNOWN = 99, + SPOE_FRM_ERRS, +}; + +/* Scopes used for variables set by agents. It is a way to be agnotic to vars + * scope. */ +enum spoe_vars_scope { + SPOE_SCOPE_PROC = 0, /* <=> SCOPE_PROC */ + SPOE_SCOPE_SESS, /* <=> SCOPE_SESS */ + SPOE_SCOPE_TXN, /* <=> SCOPE_TXN */ + SPOE_SCOPE_REQ, /* <=> SCOPE_REQ */ + SPOE_SCOPE_RES, /* <=> SCOPE_RES */ +}; + + +/* Describe an argument that will be linked to a message. It is a sample fetch, + * with an optional name. */ +struct spoe_arg { + char *name; /* Name of the argument, may be NULL */ + unsigned int name_len; /* The name length, 0 if NULL */ + struct sample_expr *expr; /* Sample expression */ + struct list list; /* Used to chain SPOE args */ +}; + +/* Used during the config parsing only because, when a SPOE agent section is + * parsed, messages can be undefined. */ +struct spoe_msg_placeholder { + char *id; /* SPOE message placeholder id */ + struct list list; /* Use to chain SPOE message placeholders */ +}; + +/* Describe a message that will be sent in a NOTIFY frame. A message has a name, + * an argument list (see above) and it is linked to a specific event. */ +struct spoe_message { + char *id; /* SPOE message id */ + unsigned int id_len; /* The message id length */ + struct spoe_agent *agent; /* SPOE agent owning this SPOE message */ + struct { + char *file; /* file where the SPOE message appears */ + int line; /* line where the SPOE message appears */ + } conf; /* config information */ + struct list args; /* Arguments added when the SPOE messages is sent */ + struct list list; /* Used to chain SPOE messages */ + + enum spoe_event event; /* SPOE_EV_* */ +}; + +/* Describe a SPOE agent. */ +struct spoe_agent { + char *id; /* SPOE agent id (name) */ + struct { + char *file; /* file where the SPOE agent appears */ + int line; /* line where the SPOE agent appears */ + } conf; /* config information */ + union { + struct proxy *be; /* Backend used by this agent */ + char *name; /* Backend name used during conf parsing */ + } b; + struct { + unsigned int hello; /* Max time to receive AGENT-HELLO frame */ + unsigned int idle; /* Max Idle timeout */ + unsigned int ack; /* Max time to acknowledge a NOTIFY frame */ + } timeout; + + char *var_pfx; /* Prefix used for vars set by the agent */ + + struct list cache; /* List used to cache SPOE streams. In + * fact, we cache the SPOE applect ctx */ + + struct list messages[SPOE_EV_EVENTS]; /* List of SPOE messages that will be sent + * for each supported events */ + + struct list applet_wq; /* List of streams waiting for a SPOE applet */ + unsigned int new_applets; /* The number of new SPOE applets */ +}; + +/* SPOE filter configuration */ +struct spoe_config { + struct proxy *proxy; /* Proxy owning the filter */ + struct spoe_agent *agent; /* Agent used by this filter */ + struct proxy agent_fe; /* Agent frontend */ +}; + +/* SPOE context attached to a stream. It is the main structure that handles the + * processing offload */ +struct spoe_context { + struct filter *filter; /* The SPOE filter */ + struct stream *strm; /* The stream that should be offloaded */ + struct appctx *appctx; /* The SPOE appctx */ + struct list *messages; /* List of messages that will be sent during the stream processing */ + struct buffer *buffer; /* Buffer used to store a NOTIFY or ACK frame */ + struct list buffer_wait; /* position in the list of streams waiting for a buffer */ + struct list applet_wait; /* position in the list of streams waiting for a SPOE applet */ + + unsigned int errs; /* The number of errors to acquire a SPOE applet */ + + enum spoe_ctx_state state; /* SPOE_CTX_ST_* */ + unsigned int flags; /* SPOE_CTX_FL_* */ + + unsigned int stream_id; /* stream_id and frame_id are used */ + unsigned int frame_id; /* to map NOTIFY and ACK frames */ + +}; + +/* Set if the handle on SIGUSR1 is registered */ +static int sighandler_registered = 0; + +/* proxy used during the parsing */ +struct proxy *curproxy = NULL; + +/* The name of the SPOE engine, used during the parsing */ +char *curengine = NULL; + +/* SPOE agent used during the parsing */ +struct spoe_agent *curagent = NULL; + +/* SPOE message used during the parsing */ +struct spoe_message *curmsg = NULL; + +/* list of SPOE messages and placeholders used during the parsing */ +struct list curmsgs; +struct list curmps; + +/* Pool used to allocate new SPOE contexts */ +static struct pool_head *pool2_spoe_ctx = NULL; + +/* Temporary variables used to ease error processing */ +int spoe_status_code = SPOE_FRM_ERR_NONE; +char spoe_reason[256]; + +struct flt_ops spoe_ops; + +static void offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx); +static void on_new_spoe_appctx_failure(struct spoe_agent *agent); +static void on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx); + +/******************************************************************** + * helper functions/globals + ********************************************************************/ +static void +release_spoe_msg_placeholder(struct spoe_msg_placeholder *mp) +{ + if (!mp) + return; + free(mp->id); + free(mp); +} + + +static void +release_spoe_message(struct spoe_message *msg) +{ + struct spoe_arg *arg, *back; + + if (!msg) + return; + free(msg->id); + free(msg->conf.file); + list_for_each_entry_safe(arg, back, &msg->args, list) { + release_sample_expr(arg->expr); + free(arg->name); + LIST_DEL(&arg->list); + free(arg); + } + free(msg); +} + +static void +release_spoe_agent(struct spoe_agent *agent) +{ + struct spoe_message *msg, *back; + int i; + + if (!agent) + return; + free(agent->id); + free(agent->conf.file); + free(agent->var_pfx); + for (i = 0; i < SPOE_EV_EVENTS; ++i) { + list_for_each_entry_safe(msg, back, &agent->messages[i], list) { + LIST_DEL(&msg->list); + release_spoe_message(msg); + } + } + free(agent); +} + +static const char *spoe_frm_err_reasons[SPOE_FRM_ERRS] = { + [SPOE_FRM_ERR_NONE] = "normal", + [SPOE_FRM_ERR_IO] = "I/O error", + [SPOE_FRM_ERR_TOUT] = "a timeout occurred", + [SPOE_FRM_ERR_TOO_BIG] = "frame is too big", + [SPOE_FRM_ERR_INVALID] = "invalid frame received", + [SPOE_FRM_ERR_NO_VSN] = "version value not found", + [SPOE_FRM_ERR_NO_FRAME_SIZE] = "max-frame-size value not found", + [SPOE_FRM_ERR_NO_CAP] = "capabilities value not found", + [SPOE_FRM_ERR_BAD_VSN] = "unsupported version", + [SPOE_FRM_ERR_BAD_FRAME_SIZE] = "max-frame-size too big or too small", + [SPOE_FRM_ERR_UNKNOWN] = "an unknown error occurred", +}; + +static const char *spoe_event_str[SPOE_EV_EVENTS] = { + [SPOE_EV_ON_CLIENT_SESS] = "on-client-session", + [SPOE_EV_ON_TCP_REQ_FE] = "on-frontend-tcp-request", + [SPOE_EV_ON_TCP_REQ_BE] = "on-backend-tcp-request", + [SPOE_EV_ON_HTTP_REQ_FE] = "on-frontend-http-request", + [SPOE_EV_ON_HTTP_REQ_BE] = "on-backend-http-request", + + [SPOE_EV_ON_SERVER_SESS] = "on-server-session", + [SPOE_EV_ON_TCP_RSP] = "on-tcp-response", + [SPOE_EV_ON_HTTP_RSP] = "on-http-response", +}; + + +#if defined(DEBUG_SPOE) || defined(DEBUG_FULL) + +static const char *spoe_ctx_state_str[SPOE_CTX_ST_ERROR+1] = { + [SPOE_CTX_ST_NONE] = "NONE", + [SPOE_CTX_ST_READY] = "READY", + [SPOE_CTX_ST_SENDING_MSGS] = "SENDING_MSGS", + [SPOE_CTX_ST_WAITING_ACK] = "WAITING_ACK", + [SPOE_CTX_ST_DONE] = "DONE", + [SPOE_CTX_ST_ERROR] = "ERROR", +}; + +static const char *spoe_appctx_state_str[SPOE_APPCTX_ST_END+1] = { + [SPOE_APPCTX_ST_CONNECT] = "CONNECT", + [SPOE_APPCTX_ST_CONNECTING] = "CONNECTING", + [SPOE_APPCTX_ST_PROCESSING] = "PROCESSING", + [SPOE_APPCTX_ST_DISCONNECT] = "DISCONNECT", + [SPOE_APPCTX_ST_DISCONNECTING] = "DISCONNECTING", + [SPOE_APPCTX_ST_EXIT] = "EXIT", + [SPOE_APPCTX_ST_END] = "END", +}; + +#endif +/******************************************************************** + * Functions that encode/decode SPOE frames + ********************************************************************/ +/* Frame Types sent by HAProxy and by agents */ +enum spoe_frame_type { + /* Frames sent by HAProxy */ + SPOE_FRM_T_HAPROXY_HELLO = 1, + SPOE_FRM_T_HAPROXY_DISCON, + SPOE_FRM_T_HAPROXY_NOTIFY, + + /* Frames sent by the agents */ + SPOE_FRM_T_AGENT_HELLO = 101, + SPOE_FRM_T_AGENT_DISCON, + SPOE_FRM_T_AGENT_ACK +}; + +/* All supported data types */ +enum spoe_data_type { + SPOE_DATA_T_NULL = 0, + SPOE_DATA_T_BOOL, + SPOE_DATA_T_INT32, + SPOE_DATA_T_UINT32, + SPOE_DATA_T_INT64, + SPOE_DATA_T_UINT64, + SPOE_DATA_T_IPV4, + SPOE_DATA_T_IPV6, + SPOE_DATA_T_STR, + SPOE_DATA_T_BIN, + SPOE_DATA_TYPES +}; + +/* Masks to get data type or flags value */ +#define SPOE_DATA_T_MASK 0x0F +#define SPOE_DATA_FL_MASK 0xF0 + +/* Flags to set Boolean values */ +#define SPOE_DATA_FL_FALSE 0x00 +#define SPOE_DATA_FL_TRUE 0x10 + +/* Helper to get static string length, excluding the terminating null byte */ +#define SLEN(str) (sizeof(str)-1) + +/* Predefined key used in HELLO/DISCONNECT frames */ +#define SUPPORTED_VERSIONS_KEY "supported-versions" +#define VERSION_KEY "version" +#define MAX_FRAME_SIZE_KEY "max-frame-size" +#define CAPABILITIES_KEY "capabilities" +#define STATUS_CODE_KEY "status-code" +#define MSG_KEY "message" + +struct spoe_version { + char *str; + int min; + int max; +}; + +/* All supported versions */ +static struct spoe_version supported_versions[] = { + {"1.0", 1000, 1000}, + {NULL, 0, 0} +}; + +/* Comma-separated list of supported versions */ +#define SUPPORTED_VERSIONS_VAL "1.0" + +/* Comma-separated list of supported capabilities (none for now) */ +#define CAPABILITIES_VAL "" + +static int +decode_spoe_version(const char *str, size_t len) +{ + char tmp[len+1], *start, *end; + double d; + int vsn = -1; + + memset(tmp, 0, len+1); + memcpy(tmp, str, len); + + start = tmp; + while (isspace(*start)) + start++; + + d = strtod(start, &end); + if (d == 0 || start == end) + goto out; + + if (*end) { + while (isspace(*end)) + end++; + if (*end) + goto out; + } + vsn = (int)(d * 1000); + out: + return vsn; +} + +/* Encode a variable-length integer. This function never fails and returns the + * number of written bytes. */ +static int +encode_spoe_varint(uint64_t i, char *buf) +{ + int idx; + + if (i < 240) { + buf[0] = (unsigned char)i; + return 1; + } + + buf[0] = (unsigned char)i | 240; + i = (i - 240) >> 4; + for (idx = 1; i >= 128; ++idx) { + buf[idx] = (unsigned char)i | 128; + i = (i - 128) >> 7; + } + buf[idx++] = (unsigned char)i; + return idx; +} + +/* Decode a varable-length integer. If the decoding fails, -1 is returned. This + * happens when the buffer's end in reached. On success, the number of read + * bytes is returned. */ +static int +decode_spoe_varint(const char *buf, const char *end, uint64_t *i) +{ + unsigned char *msg = (unsigned char *)buf; + int idx = 0; + + if (msg > (unsigned char *)end) + return -1; + + if (msg[0] < 240) { + *i = msg[0]; + return 1; + } + *i = msg[0]; + do { + ++idx; + if (msg+idx > (unsigned char *)end) + return -1; + *i += (uint64_t)msg[idx] << (4 + 7 * (idx-1)); + } while (msg[idx] >= 128); + return (idx + 1); +} + +/* Encode a string. The string will be prefix by its length, encoded as a + * variable-length integer. This function never fails and returns the number of + * written bytes. */ +static int +encode_spoe_string(const char *str, size_t len, char *dst) +{ + int idx = 0; + + if (!len) { + dst[0] = 0; + return 1; + } + + idx += encode_spoe_varint(len, dst); + memcpy(dst+idx, str, len); + return (idx + len); +} + +/* Decode a string. Its length is decoded first as a variable-length integer. If + * it succeeds, and if the string length is valid, the begin of the string is + * saved in <*str>, its length is saved in <*len> and the total numbre of bytes + * read is returned. If an error occurred, -1 is returned and <*str> remains + * NULL. */ +static int +decode_spoe_string(char *buf, char *end, char **str, uint64_t *len) +{ + int i, idx = 0; + + *str = NULL; + *len = 0; + + if ((i = decode_spoe_varint(buf, end, len)) == -1) + goto error; + idx += i; + if (buf + idx + *len > end) + goto error; + + *str = buf+idx; + return (idx + *len); + + error: + return -1; +} + +/* Skip a typed data. If an error occurred, -1 is returned, otherwise the number + * of bytes read is returned. A types data is composed of a type (1 byte) and + * corresponding data: + * - boolean: non additional data (0 bytes) + * - integers: a variable-length integer (see decode_spoe_varint) + * - ipv4: 4 bytes + * - ipv6: 16 bytes + * - binary and string: a buffer prefixed by its size, a variable-length + * integer (see decode_spoe_string) */ +static int +skip_spoe_data(char *frame, char *end) +{ + uint64_t sz = 0; + int i, idx = 0; + + if (frame > end) + return -1; + + switch (frame[idx++] & SPOE_DATA_T_MASK) { + case SPOE_DATA_T_BOOL: + break; + case SPOE_DATA_T_INT32: + case SPOE_DATA_T_INT64: + case SPOE_DATA_T_UINT32: + case SPOE_DATA_T_UINT64: + if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) + return -1; + idx += i; + break; + case SPOE_DATA_T_IPV4: + idx += 4; + break; + case SPOE_DATA_T_IPV6: + idx += 16; + break; + case SPOE_DATA_T_STR: + case SPOE_DATA_T_BIN: + if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) + return -1; + idx += i + sz; + break; + } + + if (frame+idx > end) + return -1; + return idx; +} + +/* Decode a typed data. If an error occurred, -1 is returned, otherwise the + * number of read bytes is returned. See skip_spoe_data for details. */ +static int +decode_spoe_data(char *frame, char *end, struct sample *smp) +{ + uint64_t sz = 0; + int type, i, idx = 0; + + if (frame > end) + return -1; + + type = frame[idx++]; + switch (type & SPOE_DATA_T_MASK) { + case SPOE_DATA_T_BOOL: + smp->data.u.sint = ((type & SPOE_DATA_FL_TRUE) == SPOE_DATA_FL_TRUE); + smp->data.type = SMP_T_BOOL; + break; + case SPOE_DATA_T_INT32: + case SPOE_DATA_T_INT64: + case SPOE_DATA_T_UINT32: + case SPOE_DATA_T_UINT64: + if ((i = decode_spoe_varint(frame+idx, end, (uint64_t *)&smp->data.u.sint)) == -1) + return -1; + idx += i; + smp->data.type = SMP_T_SINT; + break; + case SPOE_DATA_T_IPV4: + if (frame+idx+4 > end) + return -1; + memcpy(&smp->data.u.ipv4, frame+idx, 4); + smp->data.type = SMP_T_IPV4; + idx += 4; + break; + case SPOE_DATA_T_IPV6: + if (frame+idx+16 > end) + return -1; + memcpy(&smp->data.u.ipv6, frame+idx, 16); + smp->data.type = SMP_T_IPV6; + idx += 16; + break; + case SPOE_DATA_T_STR: + if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) + return -1; + idx += i; + if (frame+idx+sz > end) + return -1; + smp->data.u.str.str = frame+idx; + smp->data.u.str.len = sz; + smp->data.type = SMP_T_STR; + idx += sz; + break; + case SPOE_DATA_T_BIN: + if ((i = decode_spoe_varint(frame+idx, end, &sz)) == -1) + return -1; + idx += i; + if (frame+idx+sz > end) + return -1; + smp->data.u.str.str = frame+idx; + smp->data.u.str.len = sz; + smp->data.type = SMP_T_BIN; + idx += sz; + break; + } + + if (frame+idx > end) + return -1; + return idx; +} + +/* Skip an action in a frame received from an agent. If an error occurred, -1 is + * returned, otherwise the number of read bytes is returned. An action is + * composed of the action type followed by a typed data. */ +static int +skip_spoe_action(char *frame, char *end) +{ + int n, i, idx = 0; + + if (frame+2 > end) + return -1; + + idx++; /* Skip the action type */ + n = frame[idx++]; + while (n-- > 0) { + if ((i = skip_spoe_data(frame+idx, end)) == -1) + return -1; + idx += i; + } + + if (frame+idx > end) + return -1; + return idx; +} + +/* Encode HELLO frame sent by HAProxy to an agent. It returns the frame size on + * success, 0 if the frame can be ignored and -1 if an error occurred. */ +static int +prepare_spoe_hahello_frame(struct appctx *appctx, char *frame, size_t size) +{ + int idx = 0; + size_t max = (7 /* TYPE + METADATA */ + + 1 + SLEN(SUPPORTED_VERSIONS_KEY) + 1 + 1 + SLEN(SUPPORTED_VERSIONS_VAL) + + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 4 + + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + SLEN(CAPABILITIES_VAL)); + + if (size < max) + return -1; + + /* Frame type */ + frame[idx++] = SPOE_FRM_T_HAPROXY_HELLO; + + /* No flags for now */ + memset(frame+idx, 0, 4); + idx += 4; + + /* No stream-id and frame-id for HELLO frames */ + frame[idx++] = 0; + frame[idx++] = 0; + + /* There are 3 mandatory items: "supported-versions", "max-frame-size" + * and "capabilities" */ + + /* "supported-versions" K/V item */ + idx += encode_spoe_string(SUPPORTED_VERSIONS_KEY, SLEN(SUPPORTED_VERSIONS_KEY), frame+idx); + frame[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(SUPPORTED_VERSIONS_VAL, SLEN(SUPPORTED_VERSIONS_VAL), frame+idx); + + /* "max-fram-size" K/V item */ + idx += encode_spoe_string(MAX_FRAME_SIZE_KEY, SLEN(MAX_FRAME_SIZE_KEY), frame+idx); + frame[idx++] = SPOE_DATA_T_UINT32; + idx += encode_spoe_varint(APPCTX_SPOE(appctx).max_frame_size, frame+idx); + + /* "capabilities" K/V item */ + idx += encode_spoe_string(CAPABILITIES_KEY, SLEN(CAPABILITIES_KEY), frame+idx); + frame[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(CAPABILITIES_VAL, SLEN(CAPABILITIES_VAL), frame+idx); + + return idx; +} + +/* Encode DISCONNECT frame sent by HAProxy to an agent. It returns the frame + * size on success, 0 if the frame can be ignored and -1 if an error + * occurred. */ +static int +prepare_spoe_hadiscon_frame(struct appctx *appctx, char *frame, size_t size) +{ + const char *reason; + int rlen, idx = 0; + size_t max = (7 /* TYPE + METADATA */ + + 1 + SLEN(STATUS_CODE_KEY) + 1 + 2 + + 1 + SLEN(MSG_KEY) + 1 + 2 + 255); + + if (size < max) + return -1; + + /* Get the message corresponding to the status code */ + if (spoe_status_code >= SPOE_FRM_ERRS) + spoe_status_code = SPOE_FRM_ERR_UNKNOWN; + reason = spoe_frm_err_reasons[spoe_status_code]; + rlen = strlen(reason); + + /* Frame type */ + frame[idx++] = SPOE_FRM_T_HAPROXY_DISCON; + + /* No flags for now */ + memset(frame+idx, 0, 4); + idx += 4; + + /* No stream-id and frame-id for DISCONNECT frames */ + frame[idx++] = 0; + frame[idx++] = 0; + + /* There are 2 mandatory items: "status-code" and "message" */ + + /* "status-code" K/V item */ + idx += encode_spoe_string(STATUS_CODE_KEY, SLEN(STATUS_CODE_KEY), frame+idx); + frame[idx++] = SPOE_DATA_T_UINT32; + idx += encode_spoe_varint(spoe_status_code, frame+idx); + + /* "message" K/V item */ + idx += encode_spoe_string(MSG_KEY, SLEN(MSG_KEY), frame+idx); + frame[idx++] = SPOE_DATA_T_STR; + idx += encode_spoe_string(reason, rlen, frame+idx); + + return idx; +} + +/* Encode NOTIFY frame sent by HAProxy to an agent. It returns the frame size on + * success, 0 if the frame can be ignored and -1 if an error occurred. */ +static int +prepare_spoe_hanotify_frame(struct appctx *appctx, char *frame, size_t size) +{ + struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + int idx = 0; + + if (size < APPCTX_SPOE(appctx).max_frame_size) + return -1; + + frame[idx++] = SPOE_FRM_T_HAPROXY_NOTIFY; + + /* No flags for now */ + memset(frame+idx, 0, 4); + idx += 4; + + /* Set stream-id and frame-id */ + idx += encode_spoe_varint(ctx->stream_id, frame+idx); + idx += encode_spoe_varint(ctx->frame_id, frame+idx); + + /* Copy encoded messages */ + memcpy(frame+idx, ctx->buffer->p, ctx->buffer->i); + idx += ctx->buffer->i; + + return idx; +} + +/* Decode HELLO frame sent by an agent. It returns the number of by read bytes + * on success, 0 if the frame can be ignored and -1 if an error occurred. */ +static int +handle_spoe_agenthello_frame(struct appctx *appctx, char *frame, size_t size) +{ + int vsn, max_frame_size; + int i, idx = 0; + size_t min_size = (7 /* TYPE + METADATA */ + + 1 + SLEN(VERSION_KEY) + 1 + 1 + 3 + + 1 + SLEN(MAX_FRAME_SIZE_KEY) + 1 + 1 + + 1 + SLEN(CAPABILITIES_KEY) + 1 + 1 + 0); + + /* Check frame type */ + if (frame[idx++] != SPOE_FRM_T_AGENT_HELLO) + return 0; + + if (size < min_size) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + + /* Skip flags: fragmentation is not supported for now */ + idx += 4; + + /* stream-id and frame-id must be cleared */ + if (frame[idx] != 0 || frame[idx+1] != 0) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += 2; + + /* There are 3 mandatory items: "version", "max-frame-size" and + * "capabilities" */ + + /* Loop on K/V items */ + vsn = max_frame_size = 0; + while (idx < size) { + char *str; + uint64_t sz; + + /* Decode the item key */ + idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); + if (str == NULL) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + /* Check "version" K/V item */ + if (!memcmp(str, VERSION_KEY, sz)) { + /* The value must be a string */ + if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); + if (str == NULL) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + + vsn = decode_spoe_version(str, sz); + if (vsn == -1) { + spoe_status_code = SPOE_FRM_ERR_BAD_VSN; + return -1; + } + for (i = 0; supported_versions[i].str != NULL; ++i) { + if (vsn >= supported_versions[i].min && + vsn <= supported_versions[i].max) + break; + } + if (supported_versions[i].str == NULL) { + spoe_status_code = SPOE_FRM_ERR_BAD_VSN; + return -1; + } + } + /* Check "max-frame-size" K/V item */ + else if (!memcmp(str, MAX_FRAME_SIZE_KEY, sz)) { + int type; + + /* The value must be integer */ + type = frame[idx++]; + if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += i; + if (sz < MIN_FRAME_SIZE || sz > APPCTX_SPOE(appctx).max_frame_size) { + spoe_status_code = SPOE_FRM_ERR_BAD_FRAME_SIZE; + return -1; + } + max_frame_size = sz; + } + /* Skip "capabilities" K/V item for now */ + else { + /* Silently ignore unknown item */ + if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += i; + } + } + + /* Final checks */ + if (!vsn) { + spoe_status_code = SPOE_FRM_ERR_NO_VSN; + return -1; + } + if (!max_frame_size) { + spoe_status_code = SPOE_FRM_ERR_NO_FRAME_SIZE; + return -1; + } + + APPCTX_SPOE(appctx).version = (unsigned int)vsn; + APPCTX_SPOE(appctx).max_frame_size = (unsigned int)max_frame_size; + return idx; +} + +/* Decode DISCONNECT frame sent by an agent. It returns the number of by read + * bytes on success, 0 if the frame can be ignored and -1 if an error + * occurred. */ +static int +handle_spoe_agentdiscon_frame(struct appctx *appctx, char *frame, size_t size) +{ + int i, idx = 0; + size_t min_size = (7 /* TYPE + METADATA */ + + 1 + SLEN(STATUS_CODE_KEY) + 1 + 1 + + 1 + SLEN(MSG_KEY) + 1 + 1); + + /* Check frame type */ + if (frame[idx++] != SPOE_FRM_T_AGENT_DISCON) + return 0; + + if (size < min_size) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + + /* Skip flags: fragmentation is not supported for now */ + idx += 4; + + /* stream-id and frame-id must be cleared */ + if (frame[idx] != 0 || frame[idx+1] != 0) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += 2; + + /* There are 2 mandatory items: "status-code" and "message" */ + + /* Loop on K/V items */ + while (idx < size) { + char *str; + uint64_t sz; + + /* Decode the item key */ + idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); + if (str == NULL) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + + /* Check "status-code" K/V item */ + if (!memcmp(str, STATUS_CODE_KEY, sz)) { + int type; + + /* The value must be an integer */ + type = frame[idx++]; + if ((type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT32 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_INT64 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT32 && + (type & SPOE_DATA_T_MASK) != SPOE_DATA_T_UINT64) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + if ((i = decode_spoe_varint(frame+idx, frame+size, &sz)) == -1) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += i; + spoe_status_code = sz; + } + + /* Check "message" K/V item */ + else if (sz && !memcmp(str, MSG_KEY, sz)) { + /* The value must be a string */ + if ((frame[idx++] & SPOE_DATA_T_MASK) != SPOE_DATA_T_STR) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += decode_spoe_string(frame+idx, frame+size, &str, &sz); + if (str == NULL || sz > 255) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + memcpy(spoe_reason, str, sz); + spoe_reason[sz] = 0; + } + else { + /* Silently ignore unknown item */ + if ((i = skip_spoe_data(frame+idx, frame+size)) == -1) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + idx += i; + } + } + + return idx; +} + + +/* Decode ACK frame sent by an agent. It returns the number of by read bytes on + * success, 0 if the frame can be ignored and -1 if an error occurred. */ +static int +handle_spoe_agentack_frame(struct appctx *appctx, char *frame, size_t size) +{ + struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + uint64_t stream_id, frame_id; + int idx = 0; + size_t min_size = (7 /* TYPE + METADATA */); + + /* Check frame type */ + if (frame[idx++] != SPOE_FRM_T_AGENT_ACK) + return 0; + + if (size < min_size) { + spoe_status_code = SPOE_FRM_ERR_INVALID; + return -1; + } + + /* Skip flags: fragmentation is not supported for now */ + idx += 4; + + /* Get the stream-id and the frame-id */ + idx += decode_spoe_varint(frame+idx, frame+size, &stream_id); + idx += decode_spoe_varint(frame+idx, frame+size, &frame_id); + + /* Check stream-id and frame-id */ + if (ctx->stream_id != (unsigned int)stream_id || + ctx->frame_id != (unsigned int)frame_id) + return 0; + + /* Copy encoded actions */ + b_reset(ctx->buffer); + memcpy(ctx->buffer->p, frame+idx, size-idx); + ctx->buffer->i = size-idx; + + return idx; +} + + +/******************************************************************** + * Functions that manage the SPOE applet + ********************************************************************/ +/* Callback function that catches applet timeouts. If a timeout occurred, we set + * st1> flag and the SPOE applet is woken up. */ +static struct task * +process_spoe_applet(struct task * task) +{ + struct appctx *appctx = task->context; + + appctx->st1 = SPOE_APPCTX_ERR_NONE; + if (tick_is_expired(task->expire, now_ms)) { + task->expire = TICK_ETERNITY; + appctx->st1 = SPOE_APPCTX_ERR_TOUT; + } + si_applet_want_get(appctx->owner); + appctx_wakeup(appctx); + return task; +} + +/* Remove a SPOE applet from the agent cache */ +static void +remove_spoe_applet_from_cache(struct appctx *appctx) +{ + struct appctx *a, *back; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + + if (LIST_ISEMPTY(&agent->cache)) + return; + + list_for_each_entry_safe(a, back, &agent->cache, ctx.spoe.list) { + if (a == appctx) { + LIST_DEL(&APPCTX_SPOE(appctx).list); + break; + } + } +} + + +/* Callback function that releases a SPOE applet. This happens when the + * connection with the agent is closed. */ +static void +release_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + + if (appctx->st0 == SPOE_APPCTX_ST_CONNECT || + appctx->st0 == SPOE_APPCTX_ST_CONNECTING) + on_new_spoe_appctx_failure(agent); + + if (appctx->st0 != SPOE_APPCTX_ST_END) { + si_shutw(si); + si_shutr(si); + si_ic(si)->flags |= CF_READ_NULL; + appctx->st0 = SPOE_APPCTX_ST_END; + } + + if (ctx != NULL) { + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + ctx->appctx = NULL; + } + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx); + + /* Release the task attached to the SPOE applet */ + if (APPCTX_SPOE(appctx).task) { + task_delete(APPCTX_SPOE(appctx).task); + task_free(APPCTX_SPOE(appctx).task); + } + + /* And remove it from the agent cache */ + remove_spoe_applet_from_cache(appctx); + APPCTX_SPOE(appctx).ctx = NULL; +} + +/* Send a SPOE frame to an agent. It return -2 when an error occurred, -1 when + * the frame can be ignored, 0 to retry later and 1 on success. The frame is + * encoded using the callback function . */ +static int +send_spoe_frame(struct appctx *appctx, + int (*prepare)(struct appctx *, char *, size_t)) +{ + struct stream_interface *si = appctx->owner; + int framesz, ret; + uint32_t netint; + + ret = prepare(appctx, trash.str, APPCTX_SPOE(appctx).max_frame_size); + if (ret <= 0) + goto skip_or_error; + framesz = ret; + netint = htonl(framesz); + ret = bi_putblk(si_ic(si), (char *)&netint, sizeof(netint)); + if (ret > 0) + ret = bi_putblk(si_ic(si), trash.str, framesz); + if (ret <= 0) { + if (ret == -1) + return -1; + return -2; + } + return 1; + + skip_or_error: + if (!ret) + return -1; + return -2; +} + +/* Receive a SPOE frame from an agent. It return -2 when an error occurred, -1 + * when the frame can be ignored, 0 to retry later and 1 on success. The frame + * is decoded using the callback function . */ +static int +recv_spoe_frame(struct appctx *appctx, + int (*handle)(struct appctx *, char *, size_t)) +{ + struct stream_interface *si = appctx->owner; + int framesz, ret; + uint32_t netint; + + ret = bo_getblk(si_oc(si), (char *)&netint, sizeof(netint), 0); + if (ret <= 0) + goto empty_or_error; + framesz = ntohl(netint); + if (framesz > APPCTX_SPOE(appctx).max_frame_size) { + spoe_status_code = SPOE_FRM_ERR_TOO_BIG; + return -2; + } + + ret = bo_getblk(si_oc(si), trash.str, framesz, sizeof(netint)); + if (ret <= 0) + goto empty_or_error; + bo_skip(si_oc(si), ret+sizeof(netint)); + + /* First check if the received frame is a DISCONNECT frame */ + ret = handle_spoe_agentdiscon_frame(appctx, trash.str, framesz); + if (ret != 0) { + if (ret > 0) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - disconnected by peer (%d): %s\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, + __FUNCTION__, appctx, spoe_status_code, + spoe_reason); + return 2; + } + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - error on frame (%s)\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, + __FUNCTION__, appctx, + spoe_frm_err_reasons[spoe_status_code]); + return -2; + } + if (handle == NULL) + goto out; + + /* If not, try to decode it */ + ret = handle(appctx, trash.str, framesz); + if (ret <= 0) { + if (!ret) + return -1; + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - error on frame (%s)\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, + __FUNCTION__, appctx, + spoe_frm_err_reasons[spoe_status_code]); + return -2; + } + out: + return 1; + + empty_or_error: + if (!ret) + return 0; + spoe_status_code = SPOE_FRM_ERR_IO; + return -2; +} + +/* I/O Handler processing messages exchanged with the agent */ +static void +handle_spoe_applet(struct appctx *appctx) +{ + struct stream_interface *si = appctx->owner; + struct stream *s = si_strm(si); + struct spoe_agent *agent = APPCTX_SPOE(appctx).agent; + struct spoe_context *ctx = APPCTX_SPOE(appctx).ctx; + int ret; + + switchstate: + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - appctx-state=%s\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, appctx, spoe_appctx_state_str[appctx->st0]); + + switch (appctx->st0) { + case SPOE_APPCTX_ST_CONNECT: + spoe_status_code = SPOE_FRM_ERR_NONE; + if (si->state <= SI_ST_CON) { + si_applet_want_put(si); + task_wakeup(s->task, TASK_WOKEN_MSG); + break; + } + else if (si->state != SI_ST_EST) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + on_new_spoe_appctx_failure(agent); + goto switchstate; + } + ret = send_spoe_frame(appctx, &prepare_spoe_hahello_frame); + if (ret < 0) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + on_new_spoe_appctx_failure(agent); + goto switchstate; + } + else if (!ret) + goto full; + + /* Hello frame was sent. Set the hello timeout and + * wait for the reply. */ + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.hello); + appctx->st0 = SPOE_APPCTX_ST_CONNECTING; + /* fall through */ + + case SPOE_APPCTX_ST_CONNECTING: + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + on_new_spoe_appctx_failure(agent); + goto switchstate; + } + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - Connection timed out\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, + __FUNCTION__, appctx); + appctx->st0 = SPOE_APPCTX_ST_EXIT; + on_new_spoe_appctx_failure(agent); + goto switchstate; + } + ret = recv_spoe_frame(appctx, &handle_spoe_agenthello_frame); + if (ret < 0) { + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + on_new_spoe_appctx_failure(agent); + goto switchstate; + } + if (ret == 2) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + on_new_spoe_appctx_failure(agent); + goto switchstate; + } + if (!ret) + goto out; + + /* hello handshake is finished, set the idle timeout, + * Add the appctx in the agent cache, decrease the + * number of new applets and wake up waiting streams. */ + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + appctx->st0 = SPOE_APPCTX_ST_PROCESSING; + on_new_spoe_appctx_success(agent, appctx); + break; + + case SPOE_APPCTX_ST_PROCESSING: + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + spoe_status_code = SPOE_FRM_ERR_TOUT; + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + appctx->st1 = SPOE_APPCTX_ERR_NONE; + goto switchstate; + } + if (ctx != NULL && ctx->state == SPOE_CTX_ST_SENDING_MSGS) { + ret = send_spoe_frame(appctx, &prepare_spoe_hanotify_frame); + if (ret < 0) { + if (ret == -1) { + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + goto skip_notify_frame; + } + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + else if (!ret) + goto full; + ctx->state = SPOE_CTX_ST_WAITING_ACK; + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack); + } + + skip_notify_frame: + if (ctx != NULL && ctx->state == SPOE_CTX_ST_WAITING_ACK) { + ret = recv_spoe_frame(appctx, &handle_spoe_agentack_frame); + if (ret < 0) { + if (ret == -1) + goto skip_notify_frame; + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto switchstate; + } + if (!ret) + goto out; + if (ret == 2) { + ctx->state = SPOE_CTX_ST_ERROR; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + ctx->state = SPOE_CTX_ST_DONE; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + } + else { + if (stopping) { + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto switchstate; + } + + ret = recv_spoe_frame(appctx, NULL); + if (ret < 0) { + if (ret == -1) + goto skip_notify_frame; + appctx->st0 = SPOE_APPCTX_ST_DISCONNECT; + goto switchstate; + } + if (!ret) + goto out; + if (ret == 2) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.idle); + } + break; + + case SPOE_APPCTX_ST_DISCONNECT: + ret = send_spoe_frame(appctx, &prepare_spoe_hadiscon_frame); + if (ret < 0) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + else if (!ret) + goto full; + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: appctx=%p" + " - disconnected by HAProxy (%d): %s\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_agent *)APPCTX_SPOE(appctx).agent)->id, + __FUNCTION__, appctx, spoe_status_code, + spoe_frm_err_reasons[spoe_status_code]); + + APPCTX_SPOE(appctx).task->expire = tick_add_ifset(now_ms, agent->timeout.ack); + appctx->st0 = SPOE_APPCTX_ST_DISCONNECTING; + /* fall through */ + + case SPOE_APPCTX_ST_DISCONNECTING: + if (si->state == SI_ST_CLO || si_opposite(si)->state == SI_ST_CLO) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + if (appctx->st1 == SPOE_APPCTX_ERR_TOUT) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + ret = recv_spoe_frame(appctx, NULL); + if (ret < 0 || ret == 2) { + appctx->st0 = SPOE_APPCTX_ST_EXIT; + goto switchstate; + } + break; + + case SPOE_APPCTX_ST_EXIT: + si_shutw(si); + si_shutr(si); + si_ic(si)->flags |= CF_READ_NULL; + appctx->st0 = SPOE_APPCTX_ST_END; + APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY; + /* fall through */ + + case SPOE_APPCTX_ST_END: + break; + } + + out: + if (APPCTX_SPOE(appctx).task->expire != TICK_ETERNITY) + task_queue(APPCTX_SPOE(appctx).task); + si_oc(si)->flags |= CF_READ_DONTWAIT; + task_wakeup(si_strm(si)->task, TASK_WOKEN_IO); + return; + full: + si_applet_cant_put(si); + goto out; +} + +struct applet spoe_applet = { + .obj_type = OBJ_TYPE_APPLET, + .name = "", /* used for logging */ + .fct = handle_spoe_applet, + .release = release_spoe_applet, +}; + +/* Create a SPOE applet. On success, the created applet is returned, else + * NULL. */ +static struct appctx * +create_spoe_appctx(struct spoe_config *conf) +{ + struct appctx *appctx; + struct session *sess; + struct task *task; + struct stream *strm; + struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners, + struct listener *, by_fe); + + if ((appctx = appctx_new(&spoe_applet)) == NULL) + goto out_error; + + appctx->st0 = SPOE_APPCTX_ST_CONNECT; + if ((APPCTX_SPOE(appctx).task = task_new()) == NULL) + goto out_free_appctx; + APPCTX_SPOE(appctx).task->process = process_spoe_applet; + APPCTX_SPOE(appctx).task->expire = TICK_ETERNITY; + APPCTX_SPOE(appctx).task->context = appctx; + APPCTX_SPOE(appctx).agent = conf->agent; + APPCTX_SPOE(appctx).ctx = NULL; + APPCTX_SPOE(appctx).version = 0; + APPCTX_SPOE(appctx).max_frame_size = global.tune.bufsize; + task_wakeup(APPCTX_SPOE(appctx).task, TASK_WOKEN_INIT); + + sess = session_new(&conf->agent_fe, l, &appctx->obj_type); + if (!sess) + goto out_free_spoe; + + if ((task = task_new()) == NULL) + goto out_free_sess; + + if ((strm = stream_new(sess, task, &appctx->obj_type)) == NULL) + goto out_free_task; + + strm->target = sess->listener->default_target; + strm->req.analysers |= sess->listener->analysers; + stream_set_backend(strm, conf->agent->b.be); + + /* applet is waiting for data */ + si_applet_cant_get(&strm->si[0]); + appctx_wakeup(appctx); + + /* Increase the number of applets waiting the end of the hello + * handshake. */ + conf->agent->new_applets++; + + strm->do_log = NULL; + strm->res.flags |= CF_READ_DONTWAIT; + + conf->agent_fe.feconn++; + jobs++; + totalconn++; + + return appctx; + + /* Error unrolling */ + out_free_task: + task_free(task); + out_free_sess: + session_free(sess); + out_free_spoe: + task_free(APPCTX_SPOE(appctx).task); + out_free_appctx: + appctx_free(appctx); + out_error: + return NULL; +} + +/* Wake up a SPOE applet attached to a SPOE context. */ +static void +wakeup_spoe_appctx(struct spoe_context *ctx) +{ + if (ctx->appctx == NULL) + return; + if (ctx->appctx->st0 < SPOE_APPCTX_ST_EXIT) { + si_applet_want_get(ctx->appctx->owner); + si_applet_want_put(ctx->appctx->owner); + appctx_wakeup(ctx->appctx); + } +} + + +/* Run across the list of pending streams waiting for a SPOE applet and wake the + * first. */ +static void +offer_spoe_appctx(struct spoe_agent *agent, struct appctx *appctx) +{ + struct spoe_context *ctx; + + if (LIST_ISEMPTY(&agent->applet_wq)) + LIST_ADD(&agent->cache, &APPCTX_SPOE(appctx).list); + else { + ctx = LIST_NEXT(&agent->applet_wq, typeof(ctx), applet_wait); + APPCTX_SPOE(appctx).ctx = ctx; + ctx->appctx = appctx; + LIST_DEL(&ctx->applet_wait); + LIST_INIT(&ctx->applet_wait); + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - wake up stream to get available SPOE applet\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + } +} + +/* A failure occurred during SPOE applet creation. */ +static void +on_new_spoe_appctx_failure(struct spoe_agent *agent) +{ + struct spoe_context *ctx; + + agent->new_applets--; + list_for_each_entry(ctx, &agent->applet_wq, applet_wait) { + ctx->errs++; + task_wakeup(ctx->strm->task, TASK_WOKEN_MSG); + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - wake up stream because to SPOE applet connection failed\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm); + } +} + +static void +on_new_spoe_appctx_success(struct spoe_agent *agent, struct appctx *appctx) +{ + agent->new_applets--; + offer_spoe_appctx(agent, appctx); +} +/* Retrieve a SPOE applet from the agent cache if possible, else create it. It + * returns 1 on success, 0 to retry later and -1 if an error occurred. */ +static int +acquire_spoe_appctx(struct spoe_context *ctx, int dir) +{ + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; + struct appctx *appctx; + + /* If a process is already started for this SPOE context, retry + * later. */ + if (ctx->flags & SPOE_CTX_FL_PROCESS) + goto wait; + + /* If needed, initialize the buffer that will be used to encode messages + * and decode actions. */ + if (ctx->buffer == &buf_empty) { + if (!LIST_ISEMPTY(&ctx->buffer_wait)) { + LIST_DEL(&ctx->buffer_wait); + LIST_INIT(&ctx->buffer_wait); + } + + if (!b_alloc_margin(&ctx->buffer, 0)) { + LIST_ADDQ(&buffer_wq, &ctx->buffer_wait); + goto wait; + } + } + + /* If the SPOE applet was already set, all is done. */ + if (ctx->appctx) + goto success; + + /* Else try to retrieve it from the agent cache */ + if (!LIST_ISEMPTY(&agent->cache)) { + appctx = LIST_NEXT(&agent->cache, typeof(appctx), ctx.spoe.list); + LIST_DEL(&APPCTX_SPOE(appctx).list); + APPCTX_SPOE(appctx).ctx = ctx; + ctx->appctx = appctx; + goto success; + } + + /* If there is no server up for the agent's backend or it too many + * failure occurred, this is an error. */ + if ((!agent->b.be->srv_act && !agent->b.be->srv_bck) || + ctx->errs >= MAX_NEW_SPOE_APPLET_ERRS) + goto error; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - waiting for available SPOE appctx\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, __FUNCTION__, + ctx->strm); + + /* Else add the stream in the waiting queue. */ + if (LIST_ISEMPTY(&ctx->applet_wait)) + LIST_ADDQ(&agent->applet_wq, &ctx->applet_wait); + + /* Finally, create new SPOE applet if we can */ + if (agent->new_applets < MAX_NEW_SPOE_APPLETS) { + if (create_spoe_appctx(conf) == NULL) + goto error; + } + + wait: + return 0; + + success: + /* Remove the stream from the waiting queue */ + if (!LIST_ISEMPTY(&ctx->applet_wait)) { + LIST_DEL(&ctx->applet_wait); + LIST_INIT(&ctx->applet_wait); + } + + /* Set the right flag to prevent request and response processing + * in same time. */ + ctx->flags |= ((dir == SMP_OPT_DIR_REQ) + ? SPOE_CTX_FL_REQ_PROCESS + : SPOE_CTX_FL_RSP_PROCESS); + ctx->errs = 0; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - acquire SPOE appctx %p from cache\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm, ctx->appctx); + return 1; + + error: + /* Remove the stream from the waiting queue */ + if (!LIST_ISEMPTY(&ctx->applet_wait)) { + LIST_DEL(&ctx->applet_wait); + LIST_INIT(&ctx->applet_wait); + } + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - failed to acquire SPOE appctx errs=%u\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm, ctx->errs); + send_log(ctx->strm->be, LOG_WARNING, "failed to acquire SPOE applet.\n"); + + return -1; +} + +/* Release a SPOE applet and push it in the agent cache. */ +static void +release_spoe_appctx(struct spoe_context *ctx) +{ + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; + struct appctx *appctx = ctx->appctx; + + /* Reset the flag to allow next processing */ + ctx->flags &= ~SPOE_CTX_FL_PROCESS; + + /* Release the buffer if needed */ + if (ctx->buffer != &buf_empty) { + b_free(&ctx->buffer); + if (!LIST_ISEMPTY(&buffer_wq)) + stream_offer_buffers(); + } + + /* If there is no SPOE applet, all is done */ + if (!appctx) + return; + + /* Else, reassign it or push it in the agent cache */ + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - release SPOE appctx %p\n", + (int)now.tv_sec, (int)now.tv_usec, agent->id, + __FUNCTION__, ctx->strm, appctx); + + APPCTX_SPOE(appctx).ctx = NULL; + ctx->appctx = NULL; + offer_spoe_appctx(agent, appctx); +} + +/*************************************************************************** + * Functions that process SPOE messages and actions + **************************************************************************/ +/* Process SPOE messages for a specific event. During the processing, it returns + * 0 and it returns 1 when the processing is finished. If an error occurred, -1 + * is returned. */ +static int +process_spoe_messages(struct stream *s, struct spoe_context *ctx, + struct list *messages, int dir) +{ + struct spoe_message *msg; + struct sample *smp; + struct spoe_arg *arg; + char *p; + size_t max_size; + int off, flag, idx = 0; + + /* Reserve 32 bytes from the frame Metadata */ + max_size = APPCTX_SPOE(ctx->appctx).max_frame_size - 32; + + b_reset(ctx->buffer); + p = ctx->buffer->p; + + /* Loop on messages */ + list_for_each_entry(msg, messages, list) { + if (idx + msg->id_len + 1 > max_size) + goto skip; + + /* Set the message name */ + idx += encode_spoe_string(msg->id, msg->id_len, p+idx); + + /* Save offset where to store the number of arguments for this + * message */ + off = idx++; + p[off] = 0; + + /* Loop on arguments */ + list_for_each_entry(arg, &msg->args, list) { + p[off]++; /* Increment the number of arguments */ + + if (idx + arg->name_len + 1 > max_size) + goto skip; + + /* Encode the arguement name as a string. It can by NULL */ + idx += encode_spoe_string(arg->name, arg->name_len, p+idx); + + /* Fetch the arguement value */ + smp = sample_process(s->be, s->sess, s, dir|SMP_OPT_FINAL, arg->expr, NULL); + if (!smp) { + /* If no value is available, set it to NULL */ + p[idx++] = SPOE_DATA_T_NULL; + continue; + } + + /* Else, encode the arguement value */ + switch (smp->data.type) { + case SMP_T_BOOL: + flag = ((!smp->data.u.sint) ? SPOE_DATA_FL_FALSE : SPOE_DATA_FL_TRUE); + p[idx++] = (SPOE_DATA_T_BOOL | flag); + break; + case SMP_T_SINT: + p[idx++] = SPOE_DATA_T_INT64; + if (idx + 8 > max_size) + goto skip; + idx += encode_spoe_varint(smp->data.u.sint, p+idx); + break; + case SMP_T_IPV4: + p[idx++] = SPOE_DATA_T_IPV4; + if (idx + 4 > max_size) + goto skip; + memcpy(p+idx, &smp->data.u.ipv4, 4); + idx += 4; + break; + case SMP_T_IPV6: + p[idx++] = SPOE_DATA_T_IPV6; + if (idx + 16 > max_size) + goto skip; + memcpy(p+idx, &smp->data.u.ipv6, 16); + idx += 16; + break; + case SMP_T_STR: + p[idx++] = SPOE_DATA_T_STR; + if (idx + smp->data.u.str.len > max_size) + goto skip; + idx += encode_spoe_string(smp->data.u.str.str, + smp->data.u.str.len, + p+idx); + break; + case SMP_T_BIN: + p[idx++] = SPOE_DATA_T_BIN; + if (idx + smp->data.u.str.len > max_size) + goto skip; + idx += encode_spoe_string(smp->data.u.str.str, + smp->data.u.str.len, + p+idx); + break; + case SMP_T_METH: + if (smp->data.u.meth.meth == HTTP_METH_OTHER) { + p[idx++] = SPOE_DATA_T_STR; + if (idx + http_known_methods[smp->data.u.meth.meth].len > max_size) + goto skip; + idx += encode_spoe_string(http_known_methods[smp->data.u.meth.meth].name, + http_known_methods[smp->data.u.meth.meth].len, + p+idx); + } + else { + p[idx++] = SPOE_DATA_T_STR; + if (idx + smp->data.u.str.len > max_size) + goto skip; + idx += encode_spoe_string(smp->data.u.meth.str.str, + smp->data.u.meth.str.len, + p+idx); + } + break; + default: + p[idx++] = SPOE_DATA_T_NULL; + } + } + } + ctx->buffer->i = idx; + return 1; + + skip: + b_reset(ctx->buffer); + return 0; +} + +/* Helper function to set a variable */ +static void +set_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len, + struct sample *smp) +{ + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; + char varname[64]; + + memset(varname, 0, sizeof(varname)); + len = snprintf(varname, sizeof(varname), "%s.%s.%.*s", + scope, agent->var_pfx, len, name); + vars_set_by_name_ifexist(varname, len, smp); +} + +/* Helper function to unset a variable */ +static void +unset_spoe_var(struct spoe_context *ctx, char *scope, char *name, int len, + struct sample *smp) +{ + struct spoe_config *conf = FLT_CONF(ctx->filter); + struct spoe_agent *agent = conf->agent; + char varname[64]; + + memset(varname, 0, sizeof(varname)); + len = snprintf(varname, sizeof(varname), "%s.%s.%.*s", + scope, agent->var_pfx, len, name); + vars_unset_by_name_ifexist(varname, len, smp); +} + + +/* Process SPOE actions for a specific event. During the processing, it returns + * 0 and it returns 1 when the processing is finished. If an error occurred, -1 + * is returned. */ +static int +process_spoe_actions(struct stream *s, struct spoe_context *ctx, + enum spoe_event ev, int dir) +{ + char *p; + size_t size; + int off, i, idx = 0; + + p = ctx->buffer->p; + size = ctx->buffer->i; + + while (idx < size) { + char *str; + uint64_t sz; + struct sample smp; + enum spoe_action_type type; + + off = idx; + if (idx+2 > size) + goto skip; + + type = p[idx++]; + switch (type) { + case SPOE_ACT_T_SET_VAR: { + char *scope; + + if (p[idx++] != 3) + goto skip_action; + + switch (p[idx++]) { + case SPOE_SCOPE_PROC: scope = "proc"; break; + case SPOE_SCOPE_SESS: scope = "sess"; break; + case SPOE_SCOPE_TXN : scope = "txn"; break; + case SPOE_SCOPE_REQ : scope = "req"; break; + case SPOE_SCOPE_RES : scope = "res"; break; + default: goto skip; + } + + idx += decode_spoe_string(p+idx, p+size, &str, &sz); + if (str == NULL) + goto skip; + memset(&smp, 0, sizeof(smp)); + smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); + if (decode_spoe_data(p+idx, p+size, &smp) == -1) + goto skip; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - set-var '%s.%s.%.*s'\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, + __FUNCTION__, s, scope, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx, + (int)sz, str); + + set_spoe_var(ctx, scope, str, sz, &smp); + break; + } + + case SPOE_ACT_T_UNSET_VAR: { + char *scope; + + if (p[idx++] != 2) + goto skip_action; + + switch (p[idx++]) { + case SPOE_SCOPE_PROC: scope = "proc"; break; + case SPOE_SCOPE_SESS: scope = "sess"; break; + case SPOE_SCOPE_TXN : scope = "txn"; break; + case SPOE_SCOPE_REQ : scope = "req"; break; + case SPOE_SCOPE_RES : scope = "res"; break; + default: goto skip; + } + + idx += decode_spoe_string(p+idx, p+size, &str, &sz); + if (str == NULL) + goto skip; + memset(&smp, 0, sizeof(smp)); + smp_set_owner(&smp, s->be, s->sess, s, dir|SMP_OPT_FINAL); + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - unset-var '%s.%s.%.*s'\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, + __FUNCTION__, s, scope, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->var_pfx, + (int)sz, str); + + unset_spoe_var(ctx, scope, str, sz, &smp); + break; + } + + default: + skip_action: + if ((i = skip_spoe_action(p+off, p+size)) == -1) + goto skip; + idx += i; + } + } + + return 1; + skip: + return 0; +} + + +/* Process a SPOE event. First, this functions will process messages attached to + * this event and send them to an agent in a NOTIFY frame. Then, it will wait a + * ACK frame to process corresponding actions. During all the processing, it + * returns 0 and it returns 1 when the processing is finished. If an error + * occurred, -1 is returned. */ +static int +process_spoe_event(struct stream *s, struct spoe_context *ctx, + enum spoe_event ev) +{ + int dir, ret = 1; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p" + " - ctx-state=%s - event=%s\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(ctx->filter))->agent->id, + __FUNCTION__, s, spoe_ctx_state_str[ctx->state], + spoe_event_str[ev]); + + dir = ((ev < SPOE_EV_ON_SERVER_SESS) ? SMP_OPT_DIR_REQ : SMP_OPT_DIR_RES); + + if (LIST_ISEMPTY(&(ctx->messages[ev]))) + goto out; + + if (ctx->state == SPOE_CTX_ST_ERROR) + goto error; + + if (ctx->state == SPOE_CTX_ST_READY) { + ret = acquire_spoe_appctx(ctx, dir); + if (ret <= 0) { + if (!ret) + goto out; + goto error; + } + ctx->state = SPOE_CTX_ST_SENDING_MSGS; + } + + if (ctx->appctx == NULL) + goto error; + + if (ctx->state == SPOE_CTX_ST_SENDING_MSGS) { + ret = process_spoe_messages(s, ctx, &(ctx->messages[ev]), dir); + if (ret <= 0) { + if (!ret) + goto skip; + goto error; + } + wakeup_spoe_appctx(ctx); + ret = 0; + goto out; + } + + if (ctx->state == SPOE_CTX_ST_WAITING_ACK) { + wakeup_spoe_appctx(ctx); + ret = 0; + goto out; + } + + if (ctx->state == SPOE_CTX_ST_DONE) { + ret = process_spoe_actions(s, ctx, ev, dir); + if (ret <= 0) { + if (!ret) + goto skip; + goto error; + } + ctx->frame_id++; + release_spoe_appctx(ctx); + ctx->state = SPOE_CTX_ST_READY; + } + + out: + return ret; + + skip: + release_spoe_appctx(ctx); + ctx->state = SPOE_CTX_ST_READY; + return 1; + + error: + release_spoe_appctx(ctx); + ctx->state = SPOE_CTX_ST_ERROR; + return 1; +} + + +/*************************************************************************** + * Functions that create/destroy SPOE contexts + **************************************************************************/ +static struct spoe_context * +create_spoe_context(struct filter *filter) +{ + struct spoe_config *conf = FLT_CONF(filter); + struct spoe_context *ctx; + + ctx = pool_alloc_dirty(pool2_spoe_ctx); + if (ctx == NULL) { + return NULL; + } + memset(ctx, 0, sizeof(*ctx)); + ctx->filter = filter; + ctx->state = SPOE_CTX_ST_NONE; + ctx->flags = 0; + ctx->errs = 0; + ctx->messages = conf->agent->messages; + ctx->buffer = &buf_empty; + LIST_INIT(&ctx->buffer_wait); + LIST_INIT(&ctx->applet_wait); + + ctx->stream_id = 0; + ctx->frame_id = 1; + + return ctx; +} + +static void +destroy_spoe_context(struct spoe_context *ctx) +{ + if (!ctx) + return; + + if (ctx->appctx) + APPCTX_SPOE(ctx->appctx).ctx = NULL; + if (!LIST_ISEMPTY(&ctx->buffer_wait)) + LIST_DEL(&ctx->buffer_wait); + if (!LIST_ISEMPTY(&ctx->applet_wait)) + LIST_DEL(&ctx->applet_wait); + pool_free2(pool2_spoe_ctx, ctx); +} + +static void +reset_spoe_context(struct spoe_context *ctx) +{ + ctx->state = SPOE_CTX_ST_READY; + ctx->flags &= ~SPOE_CTX_FL_PROCESS; +} + + +/*************************************************************************** + * Hooks that manage the filter lifecycle (init/check/deinit) + **************************************************************************/ +/* Signal handler: Do a soft stop, wakeup SPOE applet */ +static void +sig_stop_spoe(struct sig_handler *sh) +{ + struct proxy *p; + + p = proxy; + while (p) { + struct flt_conf *fconf; + + list_for_each_entry(fconf, &p->filter_configs, list) { + struct spoe_config *conf = fconf->conf; + struct spoe_agent *agent = conf->agent; + struct appctx *appctx; + + list_for_each_entry(appctx, &agent->cache, ctx.spoe.list) { + si_applet_want_get(appctx->owner); + si_applet_want_put(appctx->owner); + appctx_wakeup(appctx); + } + } + p = p->next; + } +} + + +/* Initialize the SPOE filter. Returns -1 on error, else 0. */ +static int +spoe_init(struct proxy *px, struct flt_conf *fconf) +{ + struct spoe_config *conf = fconf->conf; + struct listener *l; + + memset(&conf->agent_fe, 0, sizeof(conf->agent_fe)); + init_new_proxy(&conf->agent_fe); + conf->agent_fe.parent = conf->agent; + conf->agent_fe.last_change = now.tv_sec; + conf->agent_fe.id = conf->agent->id; + conf->agent_fe.cap = PR_CAP_FE; + conf->agent_fe.mode = PR_MODE_TCP; + conf->agent_fe.maxconn = 0; + conf->agent_fe.options2 |= PR_O2_INDEPSTR; + conf->agent_fe.conn_retries = CONN_RETRIES; + conf->agent_fe.accept = frontend_accept; + conf->agent_fe.srv = NULL; + conf->agent_fe.timeout.client = TICK_ETERNITY; + conf->agent_fe.default_target = &spoe_applet.obj_type; + conf->agent_fe.fe_req_ana = AN_REQ_SWITCHING_RULES; + + if ((l = calloc(1, sizeof(*l))) == NULL) { + Alert("spoe_init : out of memory.\n"); + goto out_error; + } + l->obj_type = OBJ_TYPE_LISTENER; + l->obj_type = OBJ_TYPE_LISTENER; + l->frontend = &conf->agent_fe; + l->state = LI_READY; + l->analysers = conf->agent_fe.fe_req_ana; + LIST_ADDQ(&conf->agent_fe.conf.listeners, &l->by_fe); + + if (!sighandler_registered) { + signal_register_fct(0, sig_stop_spoe, 0); + sighandler_registered = 1; + } + + return 0; + + out_error: + return -1; +} + +/* Free ressources allocated by the SPOE filter. */ +static void +spoe_deinit(struct proxy *px, struct flt_conf *fconf) +{ + struct spoe_config *conf = fconf->conf; + + if (conf) { + struct spoe_agent *agent = conf->agent; + struct listener *l = LIST_NEXT(&conf->agent_fe.conf.listeners, + struct listener *, by_fe); + + free(l); + release_spoe_agent(agent); + free(conf); + } + fconf->conf = NULL; +} + +/* Check configuration of a SPOE filter for a specified proxy. + * Return 1 on error, else 0. */ +static int +spoe_check(struct proxy *px, struct flt_conf *fconf) +{ + struct spoe_config *conf = fconf->conf; + struct proxy *target; + + target = proxy_be_by_name(conf->agent->b.name); + if (target == NULL) { + Alert("Proxy %s : unknown backend '%s' used by SPOE agent '%s'" + " declared at %s:%d.\n", + px->id, conf->agent->b.name, conf->agent->id, + conf->agent->conf.file, conf->agent->conf.line); + return 1; + } + if (target->mode != PR_MODE_TCP) { + Alert("Proxy %s : backend '%s' used by SPOE agent '%s' declared" + " at %s:%d does not support HTTP mode.\n", + px->id, target->id, conf->agent->id, + conf->agent->conf.file, conf->agent->conf.line); + return 1; + } + + free(conf->agent->b.name); + conf->agent->b.name = NULL; + conf->agent->b.be = target; + return 0; +} + +/************************************************************************** + * Hooks attached to a stream + *************************************************************************/ +/* Called when a filter instance is created and attach to a stream. It creates + * the context that will be used to process this stream. */ +static int +spoe_start(struct stream *s, struct filter *filter) +{ + struct spoe_context *ctx; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(filter))->agent->id, + __FUNCTION__, s); + + ctx = create_spoe_context(filter); + if (ctx == NULL) { + send_log(s->be, LOG_EMERG, + "failed to create SPOE context for proxy %s\n", + s->be->id); + return 0; + } + + ctx->strm = s; + ctx->state = SPOE_CTX_ST_READY; + filter->ctx = ctx; + + if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_FE])) + filter->pre_analyzers |= AN_REQ_INSPECT_FE; + + if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_REQ_BE])) + filter->pre_analyzers |= AN_REQ_INSPECT_BE; + + if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_TCP_RSP])) + filter->pre_analyzers |= AN_RES_INSPECT; + + if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_FE])) + filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_FE; + + if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_REQ_BE])) + filter->pre_analyzers |= AN_REQ_HTTP_PROCESS_BE; + + if (!LIST_ISEMPTY(&ctx->messages[SPOE_EV_ON_HTTP_RSP])) + filter->pre_analyzers |= AN_RES_HTTP_PROCESS_FE; + + return 1; +} + +/* Called when a filter instance is detached from a stream. It release the + * attached SPOE context. */ +static void +spoe_stop(struct stream *s, struct filter *filter) +{ + struct spoe_context *ctx = filter->ctx; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(filter))->agent->id, + __FUNCTION__, s); + + if (ctx) { + release_spoe_appctx(ctx); + destroy_spoe_context(ctx); + } +} + +/* Called when we are ready to filter data on a channel */ +static int +spoe_start_analyze(struct stream *s, struct filter *filter, struct channel *chn) +{ + struct spoe_context *ctx = filter->ctx; + int ret = 1; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s" + " - ctx-flags=0x%08x\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(filter))->agent->id, + __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags); + + if (!(chn->flags & CF_ISRESP)) { + if (filter->pre_analyzers & AN_REQ_INSPECT_FE) + chn->analysers |= AN_REQ_INSPECT_FE; + if (filter->pre_analyzers & AN_REQ_INSPECT_BE) + chn->analysers |= AN_REQ_INSPECT_BE; + + if (ctx->flags & SPOE_CTX_FL_CLI_CONNECTED) + goto out; + + ctx->stream_id = s->uniq_id; + if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) { + ret = process_spoe_event(s, ctx, SPOE_EV_ON_CLIENT_SESS); + if (ret != 1) + goto out; + } + ctx->flags |= SPOE_CTX_FL_CLI_CONNECTED; + } + else { + if (filter->pre_analyzers & SPOE_EV_ON_TCP_RSP) + chn->analysers |= AN_RES_INSPECT; + + if (ctx->flags & SPOE_CTX_FL_SRV_CONNECTED) + goto out; + + if (ctx->state != SPOE_CTX_ST_NONE && ctx->state != SPOE_CTX_ST_ERROR) { + ret = process_spoe_event(s, ctx, SPOE_EV_ON_SERVER_SESS); + if (ret != 1) + goto out; + } + ctx->flags |= SPOE_CTX_FL_SRV_CONNECTED; + } + + out: + if (!ret) { + channel_dont_read(chn); + channel_dont_close(chn); + } + return ret; +} + +/* Called before a processing happens on a given channel */ +static int +spoe_chn_pre_analyze(struct stream *s, struct filter *filter, + struct channel *chn, unsigned an_bit) +{ + struct spoe_context *ctx = filter->ctx; + int ret = 1; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s" + " - ctx-flags=0x%08x - ana=0x%08x\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(filter))->agent->id, + __FUNCTION__, s, spoe_ctx_state_str[ctx->state], + ctx->flags, an_bit); + + if (ctx->state == SPOE_CTX_ST_NONE || ctx->state == SPOE_CTX_ST_ERROR) + goto out; + + switch (an_bit) { + case AN_REQ_INSPECT_FE: + ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_FE); + break; + case AN_REQ_INSPECT_BE: + ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_REQ_BE); + break; + case AN_RES_INSPECT: + ret = process_spoe_event(s, ctx, SPOE_EV_ON_TCP_RSP); + break; + case AN_REQ_HTTP_PROCESS_FE: + ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_FE); + break; + case AN_REQ_HTTP_PROCESS_BE: + ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_REQ_BE); + break; + case AN_RES_HTTP_PROCESS_FE: + ret = process_spoe_event(s, ctx, SPOE_EV_ON_HTTP_RSP); + break; + } + + out: + if (!ret) { + channel_dont_read(chn); + channel_dont_close(chn); + } + return ret; +} + +/* Called when the filtering on the channel ends. */ +static int +spoe_end_analyze(struct stream *s, struct filter *filter, struct channel *chn) +{ + struct spoe_context *ctx = filter->ctx; + + SPOE_PRINTF(stderr, "%d.%06d [SPOE/%-15s] %s: stream=%p - ctx-state=%s" + " - ctx-flags=0x%08x\n", + (int)now.tv_sec, (int)now.tv_usec, + ((struct spoe_config *)FLT_CONF(filter))->agent->id, + __FUNCTION__, s, spoe_ctx_state_str[ctx->state], ctx->flags); + + if (!(ctx->flags & SPOE_CTX_FL_PROCESS)) { + reset_spoe_context(ctx); + } + + return 1; +} + +/******************************************************************** + * Functions that manage the filter initialization + ********************************************************************/ +struct flt_ops spoe_ops = { + /* Manage SPOE filter, called for each filter declaration */ + .init = spoe_init, + .deinit = spoe_deinit, + .check = spoe_check, + + /* Handle start/stop of SPOE */ + .attach = spoe_start, + .detach = spoe_stop, + + /* Handle channels activity */ + .channel_start_analyze = spoe_start_analyze, + .channel_pre_analyze = spoe_chn_pre_analyze, + .channel_end_analyze = spoe_end_analyze, +}; + + +static int +cfg_parse_spoe_agent(const char *file, int linenum, char **args, int kwm) +{ + const char *err; + int i, err_code = 0; + + if ((cfg_scope == NULL && curengine != NULL) || + (cfg_scope != NULL && curengine == NULL) || + strcmp(curengine, cfg_scope)) + goto out; + + if (!strcmp(args[0], "spoe-agent")) { /* new spoe-agent section */ + if (!*args[1]) { + Alert("parsing [%s:%d] : missing name for spoe-agent section.\n", + file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + if (*args[2]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[2]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + + err = invalid_char(args[1]); + if (err) { + Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n", + file, linenum, *err, args[0], args[1]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + + if (curagent != NULL) { + Alert("parsing [%s:%d] : another spoe-agent section previously defined.\n", + file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + if ((curagent = calloc(1, sizeof(*curagent))) == NULL) { + Alert("parsing [%s:%d] : out of memory.\n", file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + + curagent->id = strdup(args[1]); + curagent->conf.file = strdup(file); + curagent->conf.line = linenum; + curagent->timeout.hello = TICK_ETERNITY; + curagent->timeout.ack = TICK_ETERNITY; + curagent->timeout.idle = TICK_ETERNITY; + curagent->var_pfx = NULL; + curagent->new_applets = 0; + + for (i = 0; i < SPOE_EV_EVENTS; ++i) + LIST_INIT(&curagent->messages[i]); + LIST_INIT(&curagent->cache); + LIST_INIT(&curagent->applet_wq); + } + else if (!strcmp(args[0], "use-backend")) { + if (!*args[1]) { + Alert("parsing [%s:%d] : '%s' expects a backend name.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (*args[2]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[2]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + free(curagent->b.name); + curagent->b.name = strdup(args[1]); + } + else if (!strcmp(args[0], "messages")) { + int cur_arg = 1; + while (*args[cur_arg]) { + struct spoe_msg_placeholder *mp = NULL; + + list_for_each_entry(mp, &curmps, list) { + if (!strcmp(mp->id, args[cur_arg])) { + Alert("parsing [%s:%d]: spoe-message message '%s' already declared.\n", + file, linenum, args[cur_arg]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + } + + if ((mp = calloc(1, sizeof(*mp))) == NULL) { + Alert("parsing [%s:%d] : out of memory.\n", file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + mp->id = strdup(args[cur_arg]); + LIST_ADDQ(&curmps, &mp->list); + cur_arg++; + } + } + else if (!strcmp(args[0], "timeout")) { + unsigned int *tv = NULL; + const char *res; + unsigned timeout; + + if (!*args[1]) { + Alert("parsing [%s:%d] : 'timeout' expects 'connect', 'idle' and 'ack'.\n", + file, linenum); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (!strcmp(args[1], "hello")) + tv = &curagent->timeout.hello; + else if (!strcmp(args[1], "idle")) + tv = &curagent->timeout.idle; + else if (!strcmp(args[1], "ack")) + tv = &curagent->timeout.ack; + else { + Alert("parsing [%s:%d] : 'timeout' supports 'connect', 'idle' and 'ack' (got %s).\n", + file, linenum, args[1]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (!*args[2]) { + Alert("parsing [%s:%d] : 'timeout %s' expects an integer value (in milliseconds).\n", + file, linenum, args[1]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + res = parse_time_err(args[2], &timeout, TIME_UNIT_MS); + if (res) { + Alert("parsing [%s:%d] : unexpected character '%c' in 'timeout %s'.\n", + file, linenum, *res, args[1]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + if (*args[3]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[3]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + *tv = MS_TO_TICKS(timeout); + } + else if (!strcmp(args[0], "option")) { + if (!*args[1]) { + Alert("parsing [%s:%d]: '%s' expects an option name.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + if (!strcmp(args[1], "var-prefix")) { + char *tmp; + + if (!*args[2]) { + Alert("parsing [%s:%d]: '%s %s' expects a value.\n", + file, linenum, args[0], + args[1]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + tmp = args[2]; + while (*tmp) { + if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') { + Alert("parsing [%s:%d]: '%s %s' only supports [a-zA-Z_-.] chars.\n", + file, linenum, args[0], args[1]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + tmp++; + } + curagent->var_pfx = strdup(args[2]); + } + else { + Alert("parsing [%s:%d]: option '%s' is not supported.\n", + file, linenum, args[1]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + } + else if (*args[0]) { + Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-agent section.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + out: + return err_code; +} + +static int +cfg_parse_spoe_message(const char *file, int linenum, char **args, int kwm) +{ + struct spoe_message *msg; + struct spoe_arg *arg; + const char *err; + char *errmsg = NULL; + int err_code = 0; + + if ((cfg_scope == NULL && curengine != NULL) || + (cfg_scope != NULL && curengine == NULL) || + strcmp(curengine, cfg_scope)) + goto out; + + if (!strcmp(args[0], "spoe-message")) { /* new spoe-message section */ + if (!*args[1]) { + Alert("parsing [%s:%d] : missing name for spoe-message section.\n", + file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + if (*args[2]) { + Alert("parsing [%s:%d] : cannot handle unexpected argument '%s'.\n", + file, linenum, args[2]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + + err = invalid_char(args[1]); + if (err) { + Alert("parsing [%s:%d] : character '%c' is not permitted in '%s' name '%s'.\n", + file, linenum, *err, args[0], args[1]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + + list_for_each_entry(msg, &curmsgs, list) { + if (!strcmp(msg->id, args[1])) { + Alert("parsing [%s:%d]: spoe-message section '%s' has the same" + " name as another one declared at %s:%d.\n", + file, linenum, args[1], msg->conf.file, msg->conf.line); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + } + + if ((curmsg = calloc(1, sizeof(*curmsg))) == NULL) { + Alert("parsing [%s:%d] : out of memory.\n", file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + + curmsg->id = strdup(args[1]); + curmsg->id_len = strlen(curmsg->id); + curmsg->event = SPOE_EV_NONE; + curmsg->conf.file = strdup(file); + curmsg->conf.line = linenum; + LIST_INIT(&curmsg->args); + LIST_ADDQ(&curmsgs, &curmsg->list); + } + else if (!strcmp(args[0], "args")) { + int cur_arg = 1; + + curproxy->conf.args.ctx = ARGC_SPOE; + curproxy->conf.args.file = file; + curproxy->conf.args.line = linenum; + while (*args[cur_arg]) { + char *delim = strchr(args[cur_arg], '='); + int idx = 0; + + if ((arg = calloc(1, sizeof(*arg))) == NULL) { + Alert("parsing [%s:%d] : out of memory.\n", file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + + if (!delim) { + arg->name = NULL; + arg->name_len = 0; + delim = args[cur_arg]; + } + else { + arg->name = my_strndup(args[cur_arg], delim - args[cur_arg]); + arg->name_len = delim - args[cur_arg]; + delim++; + } + + arg->expr = sample_parse_expr(&delim, &idx, file, linenum, &errmsg, &curproxy->conf.args); + if (arg->expr == NULL) { + Alert("parsing [%s:%d] : '%s': %s.\n", file, linenum, args[0], errmsg); + err_code |= ERR_ALERT | ERR_FATAL; + free(arg->name); + free(arg); + goto out; + } + LIST_ADDQ(&curmsg->args, &arg->list); + cur_arg++; + } + curproxy->conf.args.file = NULL; + curproxy->conf.args.line = 0; + } + else if (!strcmp(args[0], "event")) { + if (!*args[1]) { + Alert("parsing [%s:%d] : missing event name.\n", file, linenum); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_CLIENT_SESS])) + curmsg->event = SPOE_EV_ON_CLIENT_SESS; + else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_SERVER_SESS])) + curmsg->event = SPOE_EV_ON_SERVER_SESS; + + else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_FE])) + curmsg->event = SPOE_EV_ON_TCP_REQ_FE; + else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_REQ_BE])) + curmsg->event = SPOE_EV_ON_TCP_REQ_BE; + else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_TCP_RSP])) + curmsg->event = SPOE_EV_ON_TCP_RSP; + + else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_FE])) + curmsg->event = SPOE_EV_ON_HTTP_REQ_FE; + else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_REQ_BE])) + curmsg->event = SPOE_EV_ON_HTTP_REQ_BE; + else if (!strcmp(args[1], spoe_event_str[SPOE_EV_ON_HTTP_RSP])) + curmsg->event = SPOE_EV_ON_HTTP_RSP; + else { + Alert("parsing [%s:%d] : unkown event '%s'.\n", + file, linenum, args[1]); + err_code |= ERR_ALERT | ERR_ABORT; + goto out; + } + } + else if (!*args[0]) { + Alert("parsing [%s:%d] : unknown keyword '%s' in spoe-message section.\n", + file, linenum, args[0]); + err_code |= ERR_ALERT | ERR_FATAL; + goto out; + } + out: + free(errmsg); + return err_code; +} + +/* Return -1 on error, else 0 */ +static int +parse_spoe_flt(char **args, int *cur_arg, struct proxy *px, + struct flt_conf *fconf, char **err, void *private) +{ + struct list backup_sections; + struct spoe_config *conf; + struct spoe_message *msg, *msgback; + struct spoe_msg_placeholder *mp, *mpback; + char *file = NULL, *engine = NULL; + int ret, pos = *cur_arg + 1; + + conf = calloc(1, sizeof(*conf)); + if (conf == NULL) { + memprintf(err, "%s: out of memory", args[*cur_arg]); + goto error; + } + conf->proxy = px; + + while (*args[pos]) { + if (!strcmp(args[pos], "config")) { + if (!*args[pos+1]) { + memprintf(err, "'%s' : '%s' option without value", + args[*cur_arg], args[pos]); + goto error; + } + file = args[pos+1]; + pos += 2; + } + else if (!strcmp(args[pos], "engine")) { + if (!*args[pos+1]) { + memprintf(err, "'%s' : '%s' option without value", + args[*cur_arg], args[pos]); + goto error; + } + engine = args[pos+1]; + pos += 2; + } + else { + memprintf(err, "unknown keyword '%s'", args[pos]); + goto error; + } + } + if (file == NULL) { + memprintf(err, "'%s' : missing config file", args[*cur_arg]); + goto error; + } + + /* backup sections and register SPOE sections */ + LIST_INIT(&backup_sections); + cfg_backup_sections(&backup_sections); + cfg_register_section("spoe-agent", cfg_parse_spoe_agent); + cfg_register_section("spoe-message", cfg_parse_spoe_message); + + /* Parse SPOE filter configuration file */ + curengine = engine; + curproxy = px; + curagent = NULL; + curmsg = NULL; + ret = readcfgfile(file); + curproxy = NULL; + + /* unregister SPOE sections and restore previous sections */ + cfg_unregister_sections(); + cfg_restore_sections(&backup_sections); + + if (ret == -1) { + memprintf(err, "Could not open configuration file %s : %s", + file, strerror(errno)); + goto error; + } + if (ret & (ERR_ABORT|ERR_FATAL)) { + memprintf(err, "Error(s) found in configuration file %s", file); + goto error; + } + + /* Check SPOE agent */ + if (curagent == NULL) { + memprintf(err, "No SPOE agent found in file %s", file); + goto error; + } + if (curagent->b.name == NULL) { + memprintf(err, "No backend declared for SPOE agent '%s' declared at %s:%d", + curagent->id, curagent->conf.file, curagent->conf.line); + goto error; + } + if (curagent->timeout.hello == TICK_ETERNITY || + curagent->timeout.idle == TICK_ETERNITY || + curagent->timeout.ack == TICK_ETERNITY) { + Warning("Proxy '%s': missing timeouts for SPOE agent '%s' declare at %s:%d.\n" + " | While not properly invalid, you will certainly encounter various problems\n" + " | with such a configuration. To fix this, please ensure that all following\n" + " | timeouts are set to a non-zero value: 'hello', 'idle', 'ack'.\n", + px->id, curagent->id, curagent->conf.file, curagent->conf.line); + } + if (curagent->var_pfx == NULL) { + char *tmp = curagent->id; + + while (*tmp) { + if (!isalnum(*tmp) && *tmp != '_' && *tmp != '.') { + memprintf(err, "Invalid variable prefix '%s' for SPOE agent '%s' declared at %s:%d. " + "Use 'option var-prefix' to set it. Only [a-zA-Z0-9_.] chars are supported.\n", + curagent->id, curagent->id, curagent->conf.file, curagent->conf.line); + goto error; + } + tmp++; + } + curagent->var_pfx = strdup(curagent->id); + } + + if (LIST_ISEMPTY(&curmps)) { + Warning("Proxy '%s': No message used by SPOE agent '%s' declared at %s:%d.\n", + px->id, curagent->id, curagent->conf.file, curagent->conf.line); + goto finish; + } + + list_for_each_entry_safe(mp, mpback, &curmps, list) { + list_for_each_entry_safe(msg, msgback, &curmsgs, list) { + if (!strcmp(msg->id, mp->id)) { + if ((px->cap & (PR_CAP_FE|PR_CAP_BE)) == (PR_CAP_FE|PR_CAP_BE)) { + if (msg->event == SPOE_EV_ON_TCP_REQ_BE) + msg->event = SPOE_EV_ON_TCP_REQ_FE; + if (msg->event == SPOE_EV_ON_HTTP_REQ_BE) + msg->event = SPOE_EV_ON_HTTP_REQ_FE; + } + if (!(px->cap & PR_CAP_FE) && (msg->event == SPOE_EV_ON_CLIENT_SESS || + msg->event == SPOE_EV_ON_TCP_REQ_FE || + msg->event == SPOE_EV_ON_HTTP_REQ_FE)) { + Warning("Proxy '%s': frontend event used on a backend proxy at %s:%d.\n", + px->id, msg->conf.file, msg->conf.line); + goto next; + } + if (msg->event == SPOE_EV_NONE) { + Warning("Proxy '%s': Ignore SPOE message without event at %s:%d.\n", + px->id, msg->conf.file, msg->conf.line); + goto next; + } + msg->agent = curagent; + LIST_DEL(&msg->list); + LIST_ADDQ(&curagent->messages[msg->event], &msg->list); + goto next; + } + } + memprintf(err, "SPOE agent '%s' try to use undefined SPOE message '%s' at %s:%d", + curagent->id, mp->id, curagent->conf.file, curagent->conf.line); + goto error; + next: + continue; + } + + finish: + conf->agent = curagent; + list_for_each_entry_safe(mp, mpback, &curmps, list) { + LIST_DEL(&mp->list); + release_spoe_msg_placeholder(mp); + } + list_for_each_entry_safe(msg, msgback, &curmsgs, list) { + Warning("Proxy '%s': Ignore unused SPOE messages '%s' declared at %s:%d.\n", + px->id, msg->id, msg->conf.file, msg->conf.line); + LIST_DEL(&msg->list); + release_spoe_message(msg); + } + + *cur_arg = pos; + fconf->ops = &spoe_ops; + fconf->conf = conf; + return 0; + + error: + release_spoe_agent(curagent); + list_for_each_entry_safe(mp, mpback, &curmps, list) { + LIST_DEL(&mp->list); + release_spoe_msg_placeholder(mp); + } + list_for_each_entry_safe(msg, msgback, &curmsgs, list) { + LIST_DEL(&msg->list); + release_spoe_message(msg); + } + free(conf); + return -1; +} + + +/* Declare the filter parser for "spoe" keyword */ +static struct flt_kw_list flt_kws = { "SPOE", { }, { + { "spoe", parse_spoe_flt, NULL }, + { NULL, NULL, NULL }, + } +}; + +__attribute__((constructor)) +static void __spoe_init(void) +{ + flt_register_keywords(&flt_kws); + + LIST_INIT(&curmsgs); + LIST_INIT(&curmps); + pool2_spoe_ctx = create_pool("spoe_ctx", sizeof(struct spoe_context), MEM_F_SHARED); +} + +__attribute__((destructor)) +static void +__spoe_deinit(void) +{ + pool_destroy2(pool2_spoe_ctx); +} diff --git a/src/log.c b/src/log.c index 64c7922c9..123298881 100644 --- a/src/log.c +++ b/src/log.c @@ -267,6 +267,8 @@ static inline const char *fmt_directive(const struct proxy *curproxy) return "capture"; case ARGC_SRV: return "server"; + case ARGC_SPOE: + return "spoe-message"; default: return "undefined(please report this bug)"; /* must never happen */ } diff --git a/src/sample.c b/src/sample.c index 35b5913be..51e61836a 100644 --- a/src/sample.c +++ b/src/sample.c @@ -1127,6 +1127,7 @@ int smp_resolve_args(struct proxy *p) case ARGC_CAP: where = "in capture rule in"; break; case ARGC_ACL: ctx = "ACL keyword"; break; case ARGC_SRV: where = "in server directive in"; break; + case ARGC_SPOE: where = "in spoe-message directive in"; break; } /* set a few default settings */