diff --git a/Makefile b/Makefile index 11c3ebfb9..ed0cdd51d 100644 --- a/Makefile +++ b/Makefile @@ -843,7 +843,7 @@ OBJS += src/mux_h2.o src/mux_fcgi.o src/http_ana.o src/stream.o \ src/ebimtree.o src/uri_auth.o src/freq_ctr.o src/ebsttree.o \ src/ebistree.o src/auth.o src/wdt.o src/http_acl.o \ src/hpack-enc.o src/hpack-huff.o src/ebtree.o src/base64.o \ - src/hash.o src/dgram.o src/version.o src/fix.o + src/hash.o src/dgram.o src/version.o src/fix.o src/mqtt.o ifneq ($(TRACE),) OBJS += src/calltrace.o diff --git a/doc/configuration.txt b/doc/configuration.txt index a7dc4b69a..a9707115c 100644 --- a/doc/configuration.txt +++ b/doc/configuration.txt @@ -15410,6 +15410,93 @@ mod() This prefix is followed by a name. The separator is a '.'. The name may only contain characters 'a-z', 'A-Z', '0-9', '.' and '_'. +mqtt_field_value(,) + Returns value of found in input MQTT payload of type + . + can be either a string (case insensitive matching) or a numeric + value corresponding to the type of packet we're supposed to extract data + from. + Supported string and integers can be found here: + https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718021 + https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901022 + + depends on and can be any of the following below. + (note that matching is case insensitive). + can only be found in MQTT v5.0 streams. check this table: + https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901029 + + - CONNECT (or 1): flags, protocol_name, protocol_version, client_identifier, + will_topic, will_payload, username, password, keepalive + OR any property ID as a numeric value (for MQTT v5.0 + packets only): + 17: Session Expiry Interval + 33: Receive Maximum + 39: Maximum Packet Size + 34: Topic Alias Maximum + 25: Request Response Information + 23: Request Problem Information + 21: Authentication Method + 22: Authentication Data + 18: Will Delay Interval + 1: Payload Format Indicator + 2: Message Expiry Interval + 3: Content Type + 8: Response Topic + 9: Correlation Data + Not supported yet: + 38: User Property + + - CONNACK (or 2): flags, protocol_version, reason_code + OR any property ID as a numeric value (for MQTT v5.0 + packets only): + 17: Session Expiry Interval + 33: Receive Maximum + 36: Maximum QoS + 37: Retain Available + 39: Maximum Packet Size + 18: Assigned Client Identifier + 34: Topic Alias Maximum + 31: Reason String + 40; Wildcard Subscription Available + 41: Subscription Identifiers Available + 42: Shared Subscription Available + 19: Server Keep Alive + 26: Response Information + 28: Server Reference + 21: Authentication Method + 22: Authentication Data + Not supported yet: + 38: User Property + + Due to current HAProxy design, only the first message sent by the client and + the server can be parsed. Thus this converter can extract data only from + CONNECT and CONNACK packet types. CONNECT is the first message sent by the + client and CONNACK is the first response sent by the server. + + Example: + + acl data_in_buffer req.len ge 4 + tcp-request content set-var(txn.username) \ + req.payload(0,0),mqtt_field_value(connect,protocol_name) \ + if data_in_buffer + # do the same as above + tcp-request content set-var(txn.username) \ + req.payload(0,0),mqtt_field_value(1,protocol_name) \ + if data_in_buffer + +mqtt_is_valid + Checks that the binary input is a valid MQTT packet. It returns a boolean. + + Due to current HAProxy design, only the first message sent by the client and + the server can be parsed. Thus this converter can extract data only from + CONNECT and CONNACK packet types. CONNECT is the first message sent by the + client and CONNACK is the first response sent by the server. + + Example: + + acl data_in_buffer req.len ge 4 + tcp-request content reject unless req.payload(0,0),mqtt_is_valid + mul() Multiplies the input value of type signed integer by , and returns the product as an signed integer. In case of overflow, the largest possible diff --git a/include/haproxy/mqtt-t.h b/include/haproxy/mqtt-t.h new file mode 100644 index 000000000..937702178 --- /dev/null +++ b/include/haproxy/mqtt-t.h @@ -0,0 +1,309 @@ +/* + * include/haproxy/mqtt.h + * This file contains structure declarations for MQTT protocol. + * + * Copyright 2020 Baptiste Assmann + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _HAPROXY_MQTT_T_H +#define _HAPROXY_MQTT_T_H + +#include + +/* MQTT protocol version + * In MQTT 3.1.1, version is called "level" + */ +#define MQTT_VERSION_3_1_1 4 +#define MQTT_VERSION_5_0 5 + +/* + * return code when parsing / validating MQTT messages + */ +#define MQTT_INVALID_MESSAGE -1 +#define MQTT_NEED_MORE_DATA 0 +#define MQTT_VALID_MESSAGE 1 + + +/* + * MQTT Control Packet Type: MQTT_CPT_* + * + * Part of the fixed headers, encoded on the first packet byte : + * + * +-------+-----------+-----------+-----------+---------+----------+----------+---------+------------+ + * | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + * +-------+-----------+-----------+-----------+---------+----------+----------+---------+------------+ + * | field | MQTT Control Packet Type | Flags specific to each Control Packet type | + * +-------+---------------------------------------------+--------------------------------------------+ + * + * Don't forget to "left offset by 4 bits (<< 4)" the values below when matching against the fixed + * header collected in a MQTT packet. + * + * value 0x0 is reserved and forbidden + */ +enum { + MQTT_CPT_INVALID = 0, + + MQTT_CPT_CONNECT, + MQTT_CPT_CONNACK, + MQTT_CPT_PUBLISH, + MQTT_CPT_PUBACK, + MQTT_CPT_PUBREC, + MQTT_CPT_PUBREL, + MQTT_CPT_PUBCOMP, + MQTT_CPT_SUBSCRIBE, + MQTT_CPT_SUBACK, + MQTT_CPT_UNSUBSCRIBE, + MQTT_CPT_UNSUBACK, + MQTT_CPT_PINGREQ, + MQTT_CPT_PINGRESP, + MQTT_CPT_DISCONNECT, + MQTT_CPT_AUTH, + MQTT_CPT_ENTRIES /* used to mark the end/size of our MQTT_CPT_* list */ +}; + +/* MQTT CONNECT packet flags */ +#define MQTT_CONNECT_FL_RESERVED 0x01 +#define MQTT_CONNECT_FL_CLEAN_SESSION 0x02 +#define MQTT_CONNECT_FL_WILL 0x04 +#define MQTT_CONNECT_FL_WILL_QOS 0x18 /* covers 2 bits 00011000 */ +#define MQTT_CONNECT_FL_WILL_RETAIN 0x20 +#define MQTT_CONNECT_FL_PASSWORD 0x40 +#define MQTT_CONNECT_FL_USERNAME 0x80 + +/* MQTT packet properties indentifiers + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901029 + */ +#define MQTT_PROP_PAYLOAD_FORMAT_INDICATOR 0x01 +#define MQTT_PROP_MESSAGE_EXPIRY_INTERVAL 0x02 +#define MQTT_PROP_CONTENT_TYPE 0x03 +#define MQTT_PROP_RESPONSE_TOPIC 0x08 +#define MQTT_PROP_CORRELATION_DATA 0x09 +#define MQTT_PROP_SESSION_EXPIRY_INTERVAL 0x11 +#define MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER 0x12 +#define MQTT_PROP_SERVER_KEEPALIVE 0x13 +#define MQTT_PROP_AUTHENTICATION_METHOD 0x15 +#define MQTT_PROP_AUTHENTICATION_DATA 0x16 +#define MQTT_PROP_REQUEST_PROBLEM_INFORMATION 0x17 +#define MQTT_PROP_WILL_DELAY_INTERVAL 0x18 +#define MQTT_PROP_REQUEST_RESPONSE_INFORMATION 0x19 +#define MQTT_PROP_RESPONSE_INFORMATION 0x1A +#define MQTT_PROP_SERVER_REFERENCE 0x1C +#define MQTT_PROP_RECEIVE_MAXIMUM 0x21 +#define MQTT_PROP_TOPIC_ALIAS_MAXIMUM 0x22 +#define MQTT_PROP_MAXIMUM_QOS 0x24 +#define MQTT_PROP_RETAIN_AVAILABLE 0x25 +#define MQTT_PROP_USER_PROPERTIES 0x26 +#define MQTT_PROP_MAXIMUM_PACKET_SIZE 0x27 +#define MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE 0x28 +#define MQTT_PROP_SUBSCRIPTION_IDENTIFIERS_AVAILABLE 0x29 +#define MQTT_PROP_SHARED_SUBSRIPTION_AVAILABLE 0x2A +#define MQTT_PROP_REASON_STRING 0x1F +#define MQTT_PROP_LAST 0xFF + +/* MQTT minimal packet size */ +#define MQTT_MIN_PKT_SIZE 2 +#define MQTT_REMAINING_LENGHT_MAX_SIZE 4 + +/* list of supported capturable Field Names and configuration file string */ +enum { + MQTT_FN_INVALID = 0, + + MQTT_FN_FLAGS, + MQTT_FN_REASON_CODE, + MQTT_FN_PROTOCOL_NAME, + MQTT_FN_PROTOCOL_VERSION, + MQTT_FN_CLIENT_IDENTIFIER, + MQTT_FN_WILL_TOPIC, + MQTT_FN_WILL_PAYLOAD, + MQTT_FN_USERNAME, + MQTT_FN_PASSWORD, + MQTT_FN_KEEPALIVE, + + /* MQTT 5.0 properties + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901029 + */ + MQTT_FN_PAYLOAD_FORMAT_INDICATOR, + MQTT_FN_MESSAGE_EXPIRY_INTERVAL, + MQTT_FN_CONTENT_TYPE, + MQTT_FN_RESPONSE_TOPIC, + MQTT_FN_CORRELATION_DATA, + MQTT_FN_SUBSCRIPTION_IDENTIFIER, + MQTT_FN_SESSION_EXPIRY_INTERVAL, + MQTT_FN_ASSIGNED_CLIENT_IDENTIFIER, + MQTT_FN_SERVER_KEEPALIVE, + MQTT_FN_AUTHENTICATION_METHOD, + MQTT_FN_AUTHENTICATION_DATA, + MQTT_FN_REQUEST_PROBLEM_INFORMATION, + MQTT_FN_DELAY_INTERVAL, + MQTT_FN_REQUEST_RESPONSE_INFORMATION, + MQTT_FN_RESPONSE_INFORMATION, + MQTT_FN_SERVER_REFERENCE, + MQTT_FN_REASON_STRING, + MQTT_FN_RECEIVE_MAXIMUM, + MQTT_FN_TOPIC_ALIAS_MAXIMUM, + MQTT_FN_TOPIC_ALIAS, + MQTT_FN_MAXIMUM_QOS, + MQTT_FN_RETAIN_AVAILABLE, + MQTT_FN_USER_PROPERTY, + MQTT_FN_MAXIMUM_PACKET_SIZE, + MQTT_FN_WILDCARD_SUBSCRIPTION_AVAILABLE, + MQTT_FN_SUBSCRIPTION_IDENTIFIERS_AVAILABLE, + MQTT_FN_SHARED_SUBSCRIPTION_AVAILABLE, + + MQTT_FN_ENTRIES /* this one must always be the latest one */ +}; + +/* MQTT field string bit, for easy match using bitmasks + * ATTENTION: "user-properties" are not supported for now + */ +enum { + MQTT_FN_BIT_FLAGS = (1ULL << MQTT_FN_FLAGS), + MQTT_FN_BIT_REASON_CODE = (1ULL << MQTT_FN_REASON_CODE), + MQTT_FN_BIT_PROTOCOL_NAME = (1ULL << MQTT_FN_PROTOCOL_NAME), + MQTT_FN_BIT_PROTOCOL_VERSION = (1ULL << MQTT_FN_PROTOCOL_VERSION), + MQTT_FN_BIT_CLIENT_IDENTIFIER = (1ULL << MQTT_FN_CLIENT_IDENTIFIER), + MQTT_FN_BIT_WILL_TOPIC = (1ULL << MQTT_FN_WILL_TOPIC), + MQTT_FN_BIT_WILL_PAYLOAD = (1ULL << MQTT_FN_WILL_PAYLOAD), + MQTT_FN_BIT_USERNAME = (1ULL << MQTT_FN_USERNAME), + MQTT_FN_BIT_PASSWORD = (1ULL << MQTT_FN_PASSWORD), + MQTT_FN_BIT_KEEPALIVE = (1ULL << MQTT_FN_KEEPALIVE), + MQTT_FN_BIT_PAYLOAD_FORMAT_INDICATOR = (1ULL << MQTT_FN_PAYLOAD_FORMAT_INDICATOR), + MQTT_FN_BIT_MESSAGE_EXPIRY_INTERVAL = (1ULL << MQTT_FN_MESSAGE_EXPIRY_INTERVAL), + MQTT_FN_BIT_CONTENT_TYPE = (1ULL << MQTT_FN_CONTENT_TYPE), + MQTT_FN_BIT_RESPONSE_TOPIC = (1ULL << MQTT_FN_RESPONSE_TOPIC), + MQTT_FN_BIT_CORRELATION_DATA = (1ULL << MQTT_FN_CORRELATION_DATA), + MQTT_FN_BIT_SUBSCRIPTION_IDENTIFIER = (1ULL << MQTT_FN_SUBSCRIPTION_IDENTIFIER), + MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL = (1ULL << MQTT_FN_SESSION_EXPIRY_INTERVAL), + MQTT_FN_BIT_ASSIGNED_CLIENT_IDENTIFIER = (1ULL << MQTT_FN_ASSIGNED_CLIENT_IDENTIFIER), + MQTT_FN_BIT_SERVER_KEEPALIVE = (1ULL << MQTT_FN_SERVER_KEEPALIVE), + MQTT_FN_BIT_AUTHENTICATION_METHOD = (1ULL << MQTT_FN_AUTHENTICATION_METHOD), + MQTT_FN_BIT_AUTHENTICATION_DATA = (1ULL << MQTT_FN_AUTHENTICATION_DATA), + MQTT_FN_BIT_REQUEST_PROBLEM_INFORMATION = (1ULL << MQTT_FN_REQUEST_PROBLEM_INFORMATION), + MQTT_FN_BIT_DELAY_INTERVAL = (1ULL << MQTT_FN_DELAY_INTERVAL), + MQTT_FN_BIT_REQUEST_RESPONSE_INFORMATION = (1ULL << MQTT_FN_REQUEST_RESPONSE_INFORMATION), + MQTT_FN_BIT_RESPONSE_INFORMATION = (1ULL << MQTT_FN_RESPONSE_INFORMATION), + MQTT_FN_BIT_SERVER_REFERENCE = (1ULL << MQTT_FN_SERVER_REFERENCE), + MQTT_FN_BIT_REASON_STRING = (1ULL << MQTT_FN_REASON_STRING), + MQTT_FN_BIT_RECEIVE_MAXIMUM = (1ULL << MQTT_FN_RECEIVE_MAXIMUM), + MQTT_FN_BIT_TOPIC_ALIAS_MAXIMUM = (1ULL << MQTT_FN_TOPIC_ALIAS_MAXIMUM), + MQTT_FN_BIT_TOPIC_ALIAS = (1ULL << MQTT_FN_TOPIC_ALIAS), + MQTT_FN_BIT_MAXIMUM_QOS = (1ULL << MQTT_FN_MAXIMUM_QOS), + MQTT_FN_BIT_RETAIN_AVAILABLE = (1ULL << MQTT_FN_RETAIN_AVAILABLE), + MQTT_FN_BIT_USER_PROPERTY = (1ULL << MQTT_FN_USER_PROPERTY), + MQTT_FN_BIT_MAXIMUM_PACKET_SIZE = (1ULL << MQTT_FN_MAXIMUM_PACKET_SIZE), + MQTT_FN_BIT_WILDCARD_SUBSCRIPTION_AVAILABLE = (1ULL << MQTT_FN_WILDCARD_SUBSCRIPTION_AVAILABLE), + MQTT_FN_BIT_SUBSCRIPTION_IDENTIFIERS_AVAILABLE= (1ULL << MQTT_FN_SUBSCRIPTION_IDENTIFIERS_AVAILABLE), + MQTT_FN_BIT_SHARED_SUBSCRIPTION_AVAILABLE = (1ULL << MQTT_FN_SHARED_SUBSCRIPTION_AVAILABLE), +}; + +/* structure to host fields for a MQTT CONNECT packet */ +#define MQTT_PROP_USER_PROPERTY_ENTRIES 5 +struct connect { + struct { + struct ist protocol_name; + uint8_t protocol_version; + uint8_t flags; + uint16_t keepalive; + + struct { + uint32_t session_expiry_interval; + uint16_t receive_maximum; + uint32_t maximum_packet_size; + uint16_t topic_alias_maximum; + uint8_t request_response_information; + uint8_t request_problem_information; + struct { + struct ist name; + struct ist value; + } user_props[MQTT_PROP_USER_PROPERTY_ENTRIES]; + struct ist authentication_method; + struct ist authentication_data; + } props; + } var_hdr; + struct { + struct ist client_identifier; + struct { + uint32_t delay_interval; + uint8_t payload_format_indicator; + uint32_t message_expiry_interval; + struct ist content_type; + struct ist response_topic; + struct ist correlation_data; + struct { + struct ist name; + struct ist value; + } user_props[MQTT_PROP_USER_PROPERTY_ENTRIES]; + } will_props; + struct ist will_topic; + struct ist will_payload; + struct ist username; + struct ist password; + } payload; +}; + +/* structure to host fields for a MQTT CONNACK packet */ +struct connack { + struct { + uint8_t protocol_version; + uint8_t flags; + uint8_t reason_code; + struct { + uint32_t session_expiry_interval; + uint16_t receive_maximum; + uint8_t maximum_qos; + uint8_t retain_available; + uint32_t maximum_packet_size; + struct ist assigned_client_identifier; + uint16_t topic_alias_maximum; + struct ist reason_string; + struct { + struct ist name; + struct ist value; + } user_props[MQTT_PROP_USER_PROPERTY_ENTRIES]; + uint8_t wildcard_subscription_available; + uint8_t subscription_identifiers_available; + uint8_t shared_subsription_available; + uint16_t server_keepalive; + struct ist response_information; + struct ist server_reference; + struct ist authentication_method; + struct ist authentication_data; + } props; + } var_hdr; +}; + +/* structure to host a MQTT packet */ +struct mqtt_pkt { + struct { + uint8_t type; /* MQTT_CPT_* */ + uint8_t flags; /* MQTT_CPT_FL* */ + uint32_t remaining_length; + } fixed_hdr; + union { + struct connect connect; + struct connack connack; + } data; +}; + +#endif /* _HAPROXY_MQTT_T_H */ + +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * End: + */ diff --git a/include/haproxy/mqtt.h b/include/haproxy/mqtt.h new file mode 100644 index 000000000..6720bb7d7 --- /dev/null +++ b/include/haproxy/mqtt.h @@ -0,0 +1,118 @@ +/* + * include/haproxt/mqtt.h + * This file contains structure declarations for MQTT protocol. + * + * Copyright 2020 Baptiste Assmann + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation, version 2.1 + * exclusively. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#ifndef _HAPROXY_MQTT_H +#define _HAPROXY_MQTT_H + +#include + +#include +#include + +/* expected flags for control packets */ +extern uint8_t mqtt_cpt_flags[MQTT_CPT_ENTRIES]; + +/* MQTT field string names */ +extern const struct ist mqtt_fields_string[MQTT_FN_ENTRIES]; + +/* list of supported capturable field names for each MQTT control packet type */ +extern const uint64_t mqtt_fields_per_packet[MQTT_CPT_ENTRIES]; + +int mqtt_validate_message(const struct ist msg, struct mqtt_pkt *mpkt); +struct ist mqtt_field_value(const struct ist msg, int type, int fieldname_id); + +/* + * Return a MQTT packet type ID based found in . + * can be a number or a string and returned value will always be the numeric value. + * + * If can't be translated into an ID, then MQTT_CPT_INVALID (0) is returned. + */ +static inline int mqtt_typeid(struct ist str) +{ + int id; + + id = strl2ui(str.ptr, istlen(str)); + if ((id >= MQTT_CPT_CONNECT) && (id < MQTT_CPT_ENTRIES)) + return id; + + else if (isteqi(str, ist("CONNECT")) != 0) + return MQTT_CPT_CONNECT; + else if (isteqi(str, ist("CONNACK")) != 0) + return MQTT_CPT_CONNACK; + else if (isteqi(str, ist("PUBLISH")) != 0) + return MQTT_CPT_PUBLISH; + else if (isteqi(str, ist("PUBACK")) != 0) + return MQTT_CPT_PUBACK; + else if (isteqi(str, ist("PUBREC")) != 0) + return MQTT_CPT_PUBREC; + else if (isteqi(str, ist("PUBREL")) != 0) + return MQTT_CPT_PUBREL; + else if (isteqi(str, ist("PUBCOMP")) != 0) + return MQTT_CPT_PUBCOMP; + else if (isteqi(str, ist("SUBSCRIBE")) != 0) + return MQTT_CPT_SUBSCRIBE; + else if (isteqi(str, ist("SUBACK")) != 0) + return MQTT_CPT_SUBACK; + else if (isteqi(str, ist("UNSUBSCRIBE")) != 0) + return MQTT_CPT_UNSUBSCRIBE; + else if (isteqi(str, ist("UNSUBACK")) != 0) + return MQTT_CPT_UNSUBACK; + else if (isteqi(str, ist("PINGREQ")) != 0) + return MQTT_CPT_PINGREQ; + else if (isteqi(str, ist("PINGRESP")) != 0) + return MQTT_CPT_PINGRESP; + else if (isteqi(str, ist("DISCONNECT")) != 0) + return MQTT_CPT_DISCONNECT; + else if (isteqi(str, ist("AUTH")) != 0) + return MQTT_CPT_AUTH; + + return MQTT_CPT_INVALID; +} + +/* + * validate that is a field that can be extracted from a MQTT packet + * + * return the field name ID (MQTT_FN_*) if a match is found, MQTT_FN_INVALID (0) otherwise. + */ +static inline int mqtt_check_type_fieldname(int type, struct ist str) +{ + int i, id = MQTT_FN_INVALID; + + for (i = 0; i < MQTT_FN_ENTRIES; i++) { + if (isteqi(str, mqtt_fields_string[i])) { + if (mqtt_fields_per_packet[type] & (1ULL << i)) + id = i; + break; + } + } + + return id; + +} + +#endif /* _HAPROXY_MQTT_H */ + +/* + * Local variables: + * c-indent-level: 8 + * c-basic-offset: 8 + * End: + */ diff --git a/src/mqtt.c b/src/mqtt.c new file mode 100644 index 000000000..c906fd43c --- /dev/null +++ b/src/mqtt.c @@ -0,0 +1,1280 @@ +/* + * MQTT Protocol + * + * Copyright 2020 Baptiste Assmann + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version + * 2 of the License, or (at your option) any later version. + * + */ + +#include +#include + +uint8_t mqtt_cpt_flags[MQTT_CPT_ENTRIES] = { + [MQTT_CPT_INVALID] = 0x00, + [MQTT_CPT_CONNECT] = 0x00, + [MQTT_CPT_CONNACK] = 0x00, + + /* MQTT_CPT_PUBLISH flags can have different values (DUP, QoS, RETAIN), must be + * check more carefully + */ + [MQTT_CPT_PUBLISH] = 0x0F, + + [MQTT_CPT_PUBACK] = 0x00, + [MQTT_CPT_PUBREC] = 0x00, + [MQTT_CPT_PUBREL] = 0x02, + [MQTT_CPT_PUBCOMP] = 0x00, + [MQTT_CPT_SUBSCRIBE] = 0x02, + [MQTT_CPT_SUBACK] = 0x00, + [MQTT_CPT_UNSUBSCRIBE] = 0x02, + [MQTT_CPT_UNSUBACK] = 0x00, + [MQTT_CPT_PINGREQ] = 0x00, + [MQTT_CPT_PINGRESP] = 0x00, + [MQTT_CPT_DISCONNECT] = 0x00, + [MQTT_CPT_AUTH] = 0x00, +}; + +const struct ist mqtt_fields_string[MQTT_FN_ENTRIES] = { + [MQTT_FN_INVALID] = IST(""), + + /* it's MQTT 3.1.1 and 5.0, those fields have no unique id, so we use strings */ + [MQTT_FN_FLAGS] = IST("flags"), + [MQTT_FN_REASON_CODE] = IST("reason_code"), /* MQTT 3.1.1: return_code */ + [MQTT_FN_PROTOCOL_NAME] = IST("protocol_name"), + [MQTT_FN_PROTOCOL_VERSION] = IST("protocol_version"), /* MQTT 3.1.1: protocol_level */ + [MQTT_FN_CLIENT_IDENTIFIER] = IST("client_identifier"), + [MQTT_FN_WILL_TOPIC] = IST("will_topic"), + [MQTT_FN_WILL_PAYLOAD] = IST("will_payload"), /* MQTT 3.1.1: will_message */ + [MQTT_FN_USERNAME] = IST("username"), + [MQTT_FN_PASSWORD] = IST("password"), + [MQTT_FN_KEEPALIVE] = IST("keepalive"), + /* from here, it's MQTT 5.0 only */ + [MQTT_FN_PAYLOAD_FORMAT_INDICATOR] = IST("1"), + [MQTT_FN_MESSAGE_EXPIRY_INTERVAL] = IST("2"), + [MQTT_FN_CONTENT_TYPE] = IST("3"), + [MQTT_FN_RESPONSE_TOPIC] = IST("8"), + [MQTT_FN_CORRELATION_DATA] = IST("9"), + [MQTT_FN_SUBSCRIPTION_IDENTIFIER] = IST("11"), + [MQTT_FN_SESSION_EXPIRY_INTERVAL] = IST("17"), + [MQTT_FN_ASSIGNED_CLIENT_IDENTIFIER] = IST("18"), + [MQTT_FN_SERVER_KEEPALIVE] = IST("19"), + [MQTT_FN_AUTHENTICATION_METHOD] = IST("21"), + [MQTT_FN_AUTHENTICATION_DATA] = IST("22"), + [MQTT_FN_REQUEST_PROBLEM_INFORMATION] = IST("23"), + [MQTT_FN_DELAY_INTERVAL] = IST("24"), + [MQTT_FN_REQUEST_RESPONSE_INFORMATION] = IST("25"), + [MQTT_FN_RESPONSE_INFORMATION] = IST("26"), + [MQTT_FN_SERVER_REFERENCE] = IST("28"), + [MQTT_FN_REASON_STRING] = IST("31"), + [MQTT_FN_RECEIVE_MAXIMUM] = IST("33"), + [MQTT_FN_TOPIC_ALIAS_MAXIMUM] = IST("34"), + [MQTT_FN_TOPIC_ALIAS] = IST("35"), + [MQTT_FN_MAXIMUM_QOS] = IST("36"), + [MQTT_FN_RETAIN_AVAILABLE] = IST("37"), + [MQTT_FN_USER_PROPERTY] = IST("38"), + [MQTT_FN_MAXIMUM_PACKET_SIZE] = IST("39"), + [MQTT_FN_WILDCARD_SUBSCRIPTION_AVAILABLE] = IST("40"), + [MQTT_FN_SUBSCRIPTION_IDENTIFIERS_AVAILABLE] = IST("41"), + [MQTT_FN_SHARED_SUBSCRIPTION_AVAILABLE] = IST("42"), +}; + +/* list of supported capturable field names for each MQTT control packet type */ +const uint64_t mqtt_fields_per_packet[MQTT_CPT_ENTRIES] = { + [MQTT_CPT_INVALID] = 0, + + [MQTT_CPT_CONNECT] = MQTT_FN_BIT_PROTOCOL_NAME | MQTT_FN_BIT_PROTOCOL_VERSION | + MQTT_FN_BIT_FLAGS | MQTT_FN_BIT_KEEPALIVE | + MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL | MQTT_FN_BIT_RECEIVE_MAXIMUM | + MQTT_FN_BIT_MAXIMUM_PACKET_SIZE | MQTT_FN_BIT_TOPIC_ALIAS_MAXIMUM | + MQTT_FN_BIT_REQUEST_RESPONSE_INFORMATION | MQTT_FN_BIT_REQUEST_PROBLEM_INFORMATION | + MQTT_FN_BIT_USER_PROPERTY | MQTT_FN_BIT_AUTHENTICATION_METHOD | + MQTT_FN_BIT_AUTHENTICATION_DATA | MQTT_FN_BIT_CLIENT_IDENTIFIER | + MQTT_FN_BIT_DELAY_INTERVAL | MQTT_FN_BIT_PAYLOAD_FORMAT_INDICATOR | + MQTT_FN_BIT_MESSAGE_EXPIRY_INTERVAL | MQTT_FN_BIT_CONTENT_TYPE | + MQTT_FN_BIT_RESPONSE_TOPIC | MQTT_FN_BIT_CORRELATION_DATA | + MQTT_FN_BIT_USER_PROPERTY | MQTT_FN_BIT_WILL_TOPIC | + MQTT_FN_BIT_WILL_PAYLOAD | MQTT_FN_BIT_USERNAME | + MQTT_FN_BIT_PASSWORD, + + [MQTT_CPT_CONNACK] = MQTT_FN_BIT_FLAGS | MQTT_FN_BIT_PROTOCOL_VERSION | + MQTT_FN_BIT_REASON_CODE | MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL | + MQTT_FN_BIT_RECEIVE_MAXIMUM | MQTT_FN_BIT_MAXIMUM_QOS | + MQTT_FN_BIT_RETAIN_AVAILABLE | MQTT_FN_BIT_MAXIMUM_PACKET_SIZE | + MQTT_FN_BIT_ASSIGNED_CLIENT_IDENTIFIER | MQTT_FN_BIT_TOPIC_ALIAS_MAXIMUM | + MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_WILDCARD_SUBSCRIPTION_AVAILABLE | + MQTT_FN_BIT_SUBSCRIPTION_IDENTIFIERS_AVAILABLE| MQTT_FN_BIT_SHARED_SUBSCRIPTION_AVAILABLE | + MQTT_FN_BIT_SERVER_KEEPALIVE | MQTT_FN_BIT_RESPONSE_INFORMATION | + MQTT_FN_BIT_SERVER_REFERENCE | MQTT_FN_BIT_USER_PROPERTY | + MQTT_FN_BIT_AUTHENTICATION_METHOD | MQTT_FN_BIT_AUTHENTICATION_DATA, + + [MQTT_CPT_PUBLISH] = MQTT_FN_BIT_PAYLOAD_FORMAT_INDICATOR | MQTT_FN_BIT_MESSAGE_EXPIRY_INTERVAL | + MQTT_FN_BIT_CONTENT_TYPE | MQTT_FN_BIT_RESPONSE_TOPIC | + MQTT_FN_BIT_CORRELATION_DATA | MQTT_FN_BIT_SUBSCRIPTION_IDENTIFIER | + MQTT_FN_BIT_TOPIC_ALIAS | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_PUBACK] = MQTT_FN_BIT_REASON_CODE | MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_PUBREC] = MQTT_FN_BIT_REASON_CODE | MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_PUBREL] = MQTT_FN_BIT_REASON_CODE | MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_PUBCOMP] = MQTT_FN_BIT_REASON_CODE | MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_SUBSCRIBE] = MQTT_FN_BIT_SUBSCRIPTION_IDENTIFIER | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_SUBACK] = MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_UNSUBSCRIBE] = MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_UNSUBACK] = MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_PINGREQ] = 0, + + [MQTT_CPT_PINGRESP] = 0, + + [MQTT_CPT_DISCONNECT] = MQTT_FN_BIT_REASON_CODE | MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL | + MQTT_FN_BIT_SERVER_REFERENCE | MQTT_FN_BIT_REASON_STRING | + MQTT_FN_BIT_USER_PROPERTY, + + [MQTT_CPT_AUTH] = MQTT_FN_BIT_AUTHENTICATION_METHOD | MQTT_FN_BIT_AUTHENTICATION_DATA | + MQTT_FN_BIT_REASON_STRING | MQTT_FN_BIT_USER_PROPERTY, +}; + +/* Checks the first byte of a message to read the fixed header and extract the + * packet type and flags. is supposed to point to the fix header byte. + * + * Fix header looks like: + * +-------+-----------+-----------+-----------+---------+----------+----------+---------+------------+ + * | bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | + * +-------+-----------+-----------+-----------+---------+----------+----------+---------+------------+ + * | field | MQTT Control Packet Type | Flags specific to each Control Packet type | + * +-------+---------------------------------------------+--------------------------------------------+ + * + * On success, is updated with the packet type and flags and the new parser + * state is returned. On error, IST_NULL is returned. + */ +static inline struct ist mqtt_read_fixed_hdr(struct ist parser, struct mqtt_pkt *pkt) +{ + uint8_t type = (uint8_t)*istptr(parser); + uint8_t ptype = (type & 0xF0) >> 4; + uint8_t flags = type & 0x0F; + + if (ptype == MQTT_CPT_INVALID || ptype >= MQTT_CPT_ENTRIES || flags != mqtt_cpt_flags[ptype]) + return IST_NULL; + + pkt->fixed_hdr.type = ptype; + pkt->fixed_hdr.flags = flags; + return istnext(parser); +} + +/* Reads a one byte integer. more information here : + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901007 + * + * is supposed to point to the first byte of the integer. On success + * the integer is stored in <*i>, if provided, and the new parser state is returned. On + * error, IST_NULL is returned. +*/ +static inline struct ist mqtt_read_1byte_int(struct ist parser, uint8_t *i) +{ + if (istlen(parser) < 1) + return IST_NULL; + if (i) + *i = (uint8_t)*istptr(parser); + parser = istadv(parser, 1); + return parser; +} + +/* Reads a two byte integer. more information here : + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901008 + * + * is supposed to point to the first byte of the integer. On success + * the integer is stored in <*i>, if provided, and the new parser state is returned. On + * error, IST_NULL is returned. +*/ +static inline struct ist mqtt_read_2byte_int(struct ist parser, uint16_t *i) +{ + if (istlen(parser) < 2) + return IST_NULL; + if (i) { + *i = (uint8_t)*istptr(parser) << 8; + *i += (uint8_t)*(istptr(parser) + 1); + } + parser = istadv(parser, 2); + return parser; +} + +/* Reads a four byte integer. more information here : + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901009 + * + * is supposed to point to the first byte of the integer. On success + * the integer is stored in <*i>, if provided, and the new parser state is returned. On + * error, IST_NULL is returned. +*/ +static inline struct ist mqtt_read_4byte_int(struct ist parser, uint32_t *i) +{ + if (istlen(parser) < 4) + return IST_NULL; + if (i) { + *i = (uint8_t)*istptr(parser) << 24; + *i += (uint8_t)*(istptr(parser) + 1) << 16; + *i += (uint8_t)*(istptr(parser) + 2) << 8; + *i += (uint8_t)*(istptr(parser) + 3); + } + parser = istadv(parser, 4); + return parser; +} + +/* Reads a variable byte integer. more information here : + * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718023 + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011 + * + * It is encoded using a variable length encoding scheme which uses a single + * byte for values up to 127. Larger values are handled as follows. The least + * significant seven bits of each byte encode the data, and the most significant + * bit is used to indicate that there are following bytes in the representation. + * Thus each byte encodes 128 values and a "continuation bit". + * + * The maximum number of bytes in the Remaining Length field is four + * (MQTT_REMAINING_LENGHT_MAX_SIZE). + * + * is supposed to point to the first byte of the integer. On success + * the integer is stored in <*i> and the new parser state is returned. On + * error, IST_NULL is returned. + */ +static inline struct ist mqtt_read_varint(struct ist parser, uint32_t *i) +{ + int off, m; + + off = m = 0; + if (i) + *i = 0; + for (off = 0; off < MQTT_REMAINING_LENGHT_MAX_SIZE && istlen(parser); off++) { + uint8_t byte = (uint8_t)*istptr(parser); + + if (i) { + *i += (byte & 127) << m; + m += 7; /* preparing for next byte */ + } + parser = istnext(parser); + + /* we read the latest byte for the remaining length field */ + if (byte <= 127) + break; + } + + if (off == MQTT_REMAINING_LENGHT_MAX_SIZE) + return IST_NULL; + return parser; +} + +/* Reads a MQTT string. more information here : + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718016 + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901010 + * + * In MQTT, strings are prefixed by their size, encoded over 2 bytes: + * byte 1: length MSB + * byte 2: length LSB + * byte 3: string + * ... + * + * string size is MSB * 256 + LSB + * + * is supposed to point to the first byte of the string. On success the + * string is stored in <*str>, if provided, and the new parser state is + * returned. On error, IST_NULL is returned. + */ +static inline struct ist mqtt_read_string(struct ist parser, struct ist *str) +{ + uint16_t len; + + /* read and compute the string length */ + if (istlen(parser) <= 2) + goto error; + + len = ((uint16_t)*istptr(parser) << 8) + (uint16_t)*(istptr(parser) + 1); + parser = istadv(parser, 2); + if (istlen(parser) < len) + goto error; + + if (str) { + str->ptr = istptr(parser); + str->len = len; + } + + return istadv(parser, len); + + error: + return IST_NULL; +} + +/* Helper function to convert a unsigned integer to a string. The result is + * written in . On success, the written size is returned, otherwise, on + * error, 0 is returned. + */ +static inline size_t mqtt_uint2str(struct buffer *buf, uint32_t i) +{ + char *end; + + end = ultoa_o(i, buf->area, buf->size); + if (!end) + return 0; + buf->data = end - buf->area; + return buf->data; +} + +/* Extracts the value of a of type from a given MQTT + * message . IST_NULL is returned if an error occured while parsing or if + * the field could not be found. If more data are required, the message with a + * length set to 0 is returned. If the field is found, the response is returned + * as a struct ist. + */ +struct ist mqtt_field_value(struct ist msg, int type, int fieldname_id) +{ + struct buffer *trash = get_trash_chunk(); + struct mqtt_pkt mpkt; + struct ist res; + + switch (mqtt_validate_message(msg, &mpkt)) { + case MQTT_VALID_MESSAGE: + if (mpkt.fixed_hdr.type != type) + goto not_found_or_invalid; + break; + case MQTT_NEED_MORE_DATA: + goto need_more; + case MQTT_INVALID_MESSAGE: + goto not_found_or_invalid; + } + + switch (type) { + case MQTT_CPT_CONNECT: + switch (fieldname_id) { + case MQTT_FN_FLAGS: + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.flags)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_PROTOCOL_NAME: + if (!istlen(mpkt.data.connect.var_hdr.protocol_name)) + goto not_found_or_invalid; + res = mpkt.data.connect.var_hdr.protocol_name; + goto end; + + case MQTT_FN_PROTOCOL_VERSION: + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.protocol_version)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_CLIENT_IDENTIFIER: + if (!istlen(mpkt.data.connect.payload.client_identifier)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.client_identifier; + goto end; + + case MQTT_FN_WILL_TOPIC: + if (!istlen(mpkt.data.connect.payload.will_topic)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.will_topic; + goto end; + + case MQTT_FN_WILL_PAYLOAD: + if (!istlen(mpkt.data.connect.payload.will_payload)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.will_payload; + goto end; + + case MQTT_FN_USERNAME: + if (!istlen(mpkt.data.connect.payload.username)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.username; + goto end; + + case MQTT_FN_PASSWORD: + if (!istlen(mpkt.data.connect.payload.password)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.password; + goto end; + + case MQTT_FN_KEEPALIVE: + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.keepalive)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_PAYLOAD_FORMAT_INDICATOR: + if ((mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) || + !(mpkt.data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL)) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.payload.will_props.payload_format_indicator)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_MESSAGE_EXPIRY_INTERVAL: + if ((mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) || + !(mpkt.data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL)) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.payload.will_props.message_expiry_interval)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_CONTENT_TYPE: + if ((mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) || + !(mpkt.data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL)) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connect.payload.will_props.content_type)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.will_props.content_type; + goto end; + + case MQTT_FN_RESPONSE_TOPIC: + if ((mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) || + !(mpkt.data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL)) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connect.payload.will_props.response_topic)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.will_props.response_topic; + goto end; + + case MQTT_FN_CORRELATION_DATA: + if ((mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) || + !(mpkt.data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL)) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connect.payload.will_props.correlation_data)) + goto not_found_or_invalid; + res = mpkt.data.connect.payload.will_props.correlation_data; + goto end; + + case MQTT_FN_SESSION_EXPIRY_INTERVAL: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.props.session_expiry_interval)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_AUTHENTICATION_METHOD: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connect.var_hdr.props.authentication_method)) + goto not_found_or_invalid; + res = mpkt.data.connect.var_hdr.props.authentication_method; + goto end; + + case MQTT_FN_AUTHENTICATION_DATA: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connect.var_hdr.props.authentication_data)) + goto not_found_or_invalid; + res = mpkt.data.connect.var_hdr.props.authentication_data; + goto end; + + case MQTT_FN_REQUEST_PROBLEM_INFORMATION: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.props.request_problem_information)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_DELAY_INTERVAL: + if ((mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) || + !(mpkt.data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL)) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.payload.will_props.delay_interval)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_REQUEST_RESPONSE_INFORMATION: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.props.request_response_information)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_RECEIVE_MAXIMUM: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.props.receive_maximum)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_TOPIC_ALIAS_MAXIMUM: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.props.topic_alias_maximum)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_MAXIMUM_PACKET_SIZE: + if (mpkt.data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connect.var_hdr.props.maximum_packet_size)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + default: + goto not_found_or_invalid; + } + break; + + case MQTT_CPT_CONNACK: + switch (fieldname_id) { + case MQTT_FN_FLAGS: + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.flags)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_REASON_CODE: + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.reason_code)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_PROTOCOL_VERSION: + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.protocol_version)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_SESSION_EXPIRY_INTERVAL: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.session_expiry_interval)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_ASSIGNED_CLIENT_IDENTIFIER: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connack.var_hdr.props.assigned_client_identifier)) + goto not_found_or_invalid; + res = mpkt.data.connack.var_hdr.props.assigned_client_identifier; + goto end; + + case MQTT_FN_SERVER_KEEPALIVE: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.server_keepalive)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_AUTHENTICATION_METHOD: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connack.var_hdr.props.authentication_method)) + goto not_found_or_invalid; + res = mpkt.data.connack.var_hdr.props.authentication_method; + goto end; + + case MQTT_FN_AUTHENTICATION_DATA: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connack.var_hdr.props.authentication_data)) + goto not_found_or_invalid; + res = mpkt.data.connack.var_hdr.props.authentication_data; + goto end; + + case MQTT_FN_RESPONSE_INFORMATION: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connack.var_hdr.props.response_information)) + goto not_found_or_invalid; + res = mpkt.data.connack.var_hdr.props.response_information; + goto end; + + case MQTT_FN_SERVER_REFERENCE: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connack.var_hdr.props.server_reference)) + goto not_found_or_invalid; + res = mpkt.data.connack.var_hdr.props.server_reference; + goto end; + + case MQTT_FN_REASON_STRING: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!istlen(mpkt.data.connack.var_hdr.props.reason_string)) + goto not_found_or_invalid; + res = mpkt.data.connack.var_hdr.props.reason_string; + goto end; + + case MQTT_FN_RECEIVE_MAXIMUM: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.receive_maximum)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_TOPIC_ALIAS_MAXIMUM: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.topic_alias_maximum)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_MAXIMUM_QOS: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.maximum_qos)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_RETAIN_AVAILABLE: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.retain_available)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_MAXIMUM_PACKET_SIZE: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.maximum_packet_size)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_WILDCARD_SUBSCRIPTION_AVAILABLE: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.wildcard_subscription_available)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_SUBSCRIPTION_IDENTIFIERS_AVAILABLE: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.subscription_identifiers_available)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + case MQTT_FN_SHARED_SUBSCRIPTION_AVAILABLE: + if (mpkt.data.connack.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto not_found_or_invalid; + if (!mqtt_uint2str(trash, mpkt.data.connack.var_hdr.props.shared_subsription_available)) + goto not_found_or_invalid; + res = ist2(trash->area, trash->data); + goto end; + + default: + goto not_found_or_invalid; + } + break; + + default: + goto not_found_or_invalid; + } + + end: + return res; + + need_more: + return ist2(istptr(msg), 0); + + not_found_or_invalid: + return IST_NULL; +} + +/* Parses a CONNECT packet : + * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028 + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901033 + * + * should point right after the MQTT fixed header. The remaining length + * was already checked, thus missing data is an error. On success, the result of + * the parsing is stored in . + * + * Returns: + * MQTT_INVALID_MESSAGE if the CONNECT message is invalid + * MQTT_VALID_MESSAGE if the CONNECT message looks valid + */ +static int mqtt_parse_connect(struct ist parser, struct mqtt_pkt *mpkt) +{ + /* The parser length is stored to be sure exactly consumed the announced + * remaining length. */ + size_t orig_len = istlen(parser); + int ret = MQTT_INVALID_MESSAGE; + + /* + * parsing variable header + */ + /* read protocol_name */ + parser = mqtt_read_string(parser, &mpkt->data.connect.var_hdr.protocol_name); + if (!isttest(parser) || !isteqi(mpkt->data.connect.var_hdr.protocol_name, ist("MQTT"))) + goto end; + + /* read protocol_version */ + parser = mqtt_read_1byte_int(parser, &mpkt->data.connect.var_hdr.protocol_version); + if (!isttest(parser)) + goto end; + if (mpkt->data.connect.var_hdr.protocol_version != MQTT_VERSION_3_1_1 && + mpkt->data.connect.var_hdr.protocol_version != MQTT_VERSION_5_0) + goto end; + + /* read flags */ + /* bit 1 is 'reserved' and must be set to 0 in CONNECT message flags */ + parser = mqtt_read_1byte_int(parser, &mpkt->data.connect.var_hdr.flags); + if (!isttest(parser) || (mpkt->data.connect.var_hdr.flags & MQTT_CONNECT_FL_RESERVED)) + goto end; + + /* if WILL flag must be set to have WILL_QOS flag or WILL_RETAIN set */ + if ((mpkt->data.connect.var_hdr.flags & (MQTT_CONNECT_FL_WILL|MQTT_CONNECT_FL_WILL_QOS|MQTT_CONNECT_FL_WILL_RETAIN)) == MQTT_CONNECT_FL_WILL_QOS) + goto end; + + /* read keepalive */ + parser = mqtt_read_2byte_int(parser, &mpkt->data.connect.var_hdr.keepalive); + if (!isttest(parser)) + goto end; + + /* read properties, only available in MQTT_VERSION_5_0 */ + if (mpkt->data.connect.var_hdr.protocol_version == MQTT_VERSION_5_0) { + struct ist props; + unsigned int user_prop_idx = 0; + uint64_t fields = 0; + uint32_t plen = 0; + + parser = mqtt_read_varint(parser, &plen); + if (!isttest(parser) || istlen(parser) < plen) + goto end; + props = ist2(istptr(parser), plen); + parser = istadv(parser, props.len); + + while (istlen(props) > 0) { + switch (*istptr(props)) { + case MQTT_PROP_SESSION_EXPIRY_INTERVAL: + if (fields & MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL) + goto end; + props = mqtt_read_4byte_int(istnext(props), &mpkt->data.connect.var_hdr.props.session_expiry_interval); + fields |= MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL; + break; + + case MQTT_PROP_RECEIVE_MAXIMUM: + if (fields & MQTT_FN_BIT_RECEIVE_MAXIMUM) + goto end; + props = mqtt_read_2byte_int(istnext(props), &mpkt->data.connect.var_hdr.props.receive_maximum); + /* cannot be 0 */ + if (!mpkt->data.connect.var_hdr.props.receive_maximum) + goto end; + fields |= MQTT_FN_BIT_RECEIVE_MAXIMUM; + break; + + case MQTT_PROP_MAXIMUM_PACKET_SIZE: + if (fields & MQTT_FN_BIT_MAXIMUM_PACKET_SIZE) + goto end; + props = mqtt_read_4byte_int(istnext(props), &mpkt->data.connect.var_hdr.props.maximum_packet_size); + /* cannot be 0 */ + if (!mpkt->data.connect.var_hdr.props.maximum_packet_size) + goto end; + fields |= MQTT_FN_BIT_MAXIMUM_PACKET_SIZE; + break; + + case MQTT_PROP_TOPIC_ALIAS_MAXIMUM: + if (fields & MQTT_FN_BIT_TOPIC_ALIAS) + goto end; + props = mqtt_read_2byte_int(istnext(props), &mpkt->data.connect.var_hdr.props.topic_alias_maximum); + fields |= MQTT_FN_BIT_TOPIC_ALIAS; + break; + + case MQTT_PROP_REQUEST_RESPONSE_INFORMATION: + if (fields & MQTT_FN_BIT_REQUEST_RESPONSE_INFORMATION) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connect.var_hdr.props.request_response_information); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connect.var_hdr.props.request_response_information > 1) + goto end; + fields |= MQTT_FN_BIT_REQUEST_RESPONSE_INFORMATION; + break; + + case MQTT_PROP_REQUEST_PROBLEM_INFORMATION: + if (fields & MQTT_FN_BIT_REQUEST_PROBLEM_INFORMATION) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connect.var_hdr.props.request_problem_information); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connect.var_hdr.props.request_problem_information > 1) + goto end; + fields |= MQTT_FN_BIT_REQUEST_PROBLEM_INFORMATION; + break; + + case MQTT_PROP_USER_PROPERTIES: + /* if we reached MQTT_PROP_USER_PROPERTY_ENTRIES already, then + * we start writing over the first property */ + if (user_prop_idx >= MQTT_PROP_USER_PROPERTY_ENTRIES) + user_prop_idx = 0; + + /* read user property name and value */ + props = mqtt_read_string(istnext(props), &mpkt->data.connect.var_hdr.props.user_props[user_prop_idx].name); + if (!isttest(props)) + goto end; + props = mqtt_read_string(props, &mpkt->data.connect.var_hdr.props.user_props[user_prop_idx].value); + ++user_prop_idx; + break; + + case MQTT_PROP_AUTHENTICATION_METHOD: + if (fields & MQTT_FN_BIT_AUTHENTICATION_METHOD) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connect.var_hdr.props.authentication_method); + fields |= MQTT_FN_BIT_AUTHENTICATION_METHOD; + break; + + case MQTT_PROP_AUTHENTICATION_DATA: + if (fields & MQTT_FN_BIT_AUTHENTICATION_DATA) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connect.var_hdr.props.authentication_data); + fields |= MQTT_FN_BIT_AUTHENTICATION_DATA; + break; + + default: + goto end; + } + + if (!isttest(props)) + goto end; + } + } + + /* cannot have auth data without auth method */ + if (!istlen(mpkt->data.connect.var_hdr.props.authentication_method) && + istlen(mpkt->data.connect.var_hdr.props.authentication_data)) + goto end; + + /* parsing payload + * + * Content of payload is realted to flags parsed above and the field order is pre-defined: + * Client Identifier, Will Topic, Will Message, User Name, Password + */ + /* read client identifier */ + parser = mqtt_read_string(parser, &mpkt->data.connect.payload.client_identifier); + if (!isttest(parser) || !istlen(mpkt->data.connect.payload.client_identifier)) + goto end; + + /* read Will Properties, for MQTT v5 only + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901060 + */ + if ((mpkt->data.connect.var_hdr.protocol_version == MQTT_VERSION_5_0) && + (mpkt->data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL)) { + struct ist props; + unsigned int user_prop_idx = 0; + uint64_t fields = 0; + uint32_t plen = 0; + + parser = mqtt_read_varint(parser, &plen); + if (!isttest(parser) || istlen(parser) < plen) + goto end; + props = ist2(istptr(parser), plen); + parser = istadv(parser, props.len); + + while (istlen(props) > 0) { + switch (*istptr(props)) { + case MQTT_PROP_WILL_DELAY_INTERVAL: + if (fields & MQTT_FN_BIT_DELAY_INTERVAL) + goto end; + props = mqtt_read_4byte_int(istnext(props), &mpkt->data.connect.payload.will_props.delay_interval); + fields |= MQTT_FN_BIT_DELAY_INTERVAL; + break; + + case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + if (fields & MQTT_FN_BIT_PAYLOAD_FORMAT_INDICATOR) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connect.payload.will_props.payload_format_indicator); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connect.payload.will_props.payload_format_indicator > 1) + goto end; + fields |= MQTT_FN_BIT_PAYLOAD_FORMAT_INDICATOR; + break; + + case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + if (fields & MQTT_FN_BIT_MESSAGE_EXPIRY_INTERVAL) + goto end; + props = mqtt_read_4byte_int(istnext(props), &mpkt->data.connect.payload.will_props.message_expiry_interval); + fields |= MQTT_FN_BIT_MESSAGE_EXPIRY_INTERVAL; + break; + + case MQTT_PROP_CONTENT_TYPE: + if (fields & MQTT_FN_BIT_CONTENT_TYPE) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connect.payload.will_props.content_type); + fields |= MQTT_FN_BIT_CONTENT_TYPE; + break; + + case MQTT_PROP_RESPONSE_TOPIC: + if (fields & MQTT_FN_BIT_RESPONSE_TOPIC) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connect.payload.will_props.response_topic); + fields |= MQTT_FN_BIT_RESPONSE_TOPIC; + break; + + case MQTT_PROP_CORRELATION_DATA: + if (fields & MQTT_FN_BIT_CORRELATION_DATA) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connect.payload.will_props.correlation_data); + fields |= MQTT_FN_BIT_CORRELATION_DATA; + break; + + case MQTT_PROP_USER_PROPERTIES: + /* if we reached MQTT_PROP_USER_PROPERTY_ENTRIES already, then + * we start writing over the first property */ + if (user_prop_idx >= MQTT_PROP_USER_PROPERTY_ENTRIES) + user_prop_idx = 0; + + /* read user property name and value */ + props = mqtt_read_string(istnext(props), &mpkt->data.connect.payload.will_props.user_props[user_prop_idx].name); + if (!isttest(props)) + goto end; + props = mqtt_read_string(props, &mpkt->data.connect.payload.will_props.user_props[user_prop_idx].value); + ++user_prop_idx; + break; + + default: + goto end; + } + + if (!isttest(props)) + goto end; + } + } + + /* read Will Topic and Will Message (MQTT 3.1.1) or Payload (MQTT 5.0) */ + if (mpkt->data.connect.var_hdr.flags & MQTT_CONNECT_FL_WILL) { + parser = mqtt_read_string(parser, &mpkt->data.connect.payload.will_topic); + if (!isttest(parser)) + goto end; + parser = mqtt_read_string(parser, &mpkt->data.connect.payload.will_payload); + if (!isttest(parser)) + goto end; + } + + /* read User Name */ + if (mpkt->data.connect.var_hdr.flags & MQTT_CONNECT_FL_USERNAME) { + parser = mqtt_read_string(parser, &mpkt->data.connect.payload.username); + if (!isttest(parser)) + goto end; + } + + /* read Password */ + if (mpkt->data.connect.var_hdr.flags & MQTT_CONNECT_FL_PASSWORD) { + parser = mqtt_read_string(parser, &mpkt->data.connect.payload.password); + if (!isttest(parser)) + goto end; + } + + if ((orig_len - istlen(parser)) == mpkt->fixed_hdr.remaining_length) + ret = MQTT_VALID_MESSAGE; + + end: + return ret; +} + +/* Parses a CONNACK packet : + * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718033 + * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901074 + * + * should point right after the MQTT fixed header. The remaining length + * was already checked, thus missing data is an error. On success, the result of + * the parsing is stored in . + * + * Returns: + * MQTT_INVALID_MESSAGE if the CONNECT message is invalid + * MQTT_VALID_MESSAGE if the CONNECT message looks valid + */ +static int mqtt_parse_connack(struct ist parser, struct mqtt_pkt *mpkt) +{ + /* The parser length is stored to be sure exactly consumed the announced + * remaining length. */ + size_t orig_len = istlen(parser); + int ret = MQTT_INVALID_MESSAGE; + + if (istlen(parser) < 2) + goto end; + else if (istlen(parser) == 2) + mpkt->data.connack.var_hdr.protocol_version = MQTT_VERSION_3_1_1; + else + mpkt->data.connack.var_hdr.protocol_version = MQTT_VERSION_5_0; + + /* + * parsing variable header + */ + /* read flags */ + /* bits 7 to 1 on flags are reserved and must be 0 */ + parser = mqtt_read_1byte_int(parser, &mpkt->data.connack.var_hdr.flags); + if (!isttest(parser) || (mpkt->data.connack.var_hdr.flags & 0xFE)) + goto end; + + /* read reason_code */ + parser = mqtt_read_1byte_int(parser, &mpkt->data.connack.var_hdr.reason_code); + if (!isttest(parser)) + goto end; + + /* we can leave here for MQTT 3.1.1 */ + if (mpkt->data.connack.var_hdr.protocol_version == MQTT_VERSION_3_1_1) { + if ((orig_len - istlen(parser)) == mpkt->fixed_hdr.remaining_length) + ret = MQTT_VALID_MESSAGE; + goto end; + } + + /* read properties, only available in MQTT_VERSION_5_0 */ + if (mpkt->data.connack.var_hdr.protocol_version == MQTT_VERSION_5_0) { + struct ist props; + unsigned int user_prop_idx = 0; + uint64_t fields = 0; + uint32_t plen = 0; + + parser = mqtt_read_varint(parser, &plen); + if (!isttest(parser) || istlen(parser) < plen) + goto end; + props = ist2(istptr(parser), plen); + parser = istadv(parser, props.len); + + while (istlen(props) > 0) { + switch (*istptr(props)) { + case MQTT_PROP_SESSION_EXPIRY_INTERVAL: + if (fields & MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL) + goto end; + props = mqtt_read_4byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.session_expiry_interval); + fields |= MQTT_FN_BIT_SESSION_EXPIRY_INTERVAL; + break; + + case MQTT_PROP_RECEIVE_MAXIMUM: + if (fields & MQTT_FN_BIT_RECEIVE_MAXIMUM) + goto end; + props = mqtt_read_2byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.receive_maximum); + /* cannot be 0 */ + if (!mpkt->data.connack.var_hdr.props.receive_maximum) + goto end; + fields |= MQTT_FN_BIT_RECEIVE_MAXIMUM; + break; + + case MQTT_PROP_MAXIMUM_QOS: + if (fields & MQTT_FN_BIT_MAXIMUM_QOS) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.maximum_qos); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connack.var_hdr.props.maximum_qos > 1) + goto end; + fields |= MQTT_FN_BIT_MAXIMUM_QOS; + break; + + case MQTT_PROP_RETAIN_AVAILABLE: + if (fields & MQTT_FN_BIT_RETAIN_AVAILABLE) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.retain_available); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connack.var_hdr.props.retain_available > 1) + goto end; + fields |= MQTT_FN_BIT_RETAIN_AVAILABLE; + break; + + case MQTT_PROP_MAXIMUM_PACKET_SIZE: + if (fields & MQTT_FN_BIT_MAXIMUM_PACKET_SIZE) + goto end; + props = mqtt_read_4byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.maximum_packet_size); + /* cannot be 0 */ + if (!mpkt->data.connack.var_hdr.props.maximum_packet_size) + goto end; + fields |= MQTT_FN_BIT_MAXIMUM_PACKET_SIZE; + break; + + case MQTT_PROP_ASSIGNED_CLIENT_IDENTIFIER: + if (fields & MQTT_FN_BIT_ASSIGNED_CLIENT_IDENTIFIER) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connack.var_hdr.props.assigned_client_identifier); + if (!istlen(mpkt->data.connack.var_hdr.props.assigned_client_identifier)) + goto end; + fields |= MQTT_FN_BIT_ASSIGNED_CLIENT_IDENTIFIER; + break; + + case MQTT_PROP_TOPIC_ALIAS_MAXIMUM: + if (fields & MQTT_FN_BIT_TOPIC_ALIAS_MAXIMUM) + goto end; + props = mqtt_read_2byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.topic_alias_maximum); + fields |= MQTT_FN_BIT_TOPIC_ALIAS_MAXIMUM; + break; + + case MQTT_PROP_REASON_STRING: + if (fields & MQTT_FN_BIT_REASON_STRING) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connack.var_hdr.props.reason_string); + fields |= MQTT_FN_BIT_REASON_STRING; + break; + + case MQTT_PROP_WILDCARD_SUBSCRIPTION_AVAILABLE: + if (fields & MQTT_FN_BIT_WILDCARD_SUBSCRIPTION_AVAILABLE) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.wildcard_subscription_available); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connack.var_hdr.props.wildcard_subscription_available > 1) + goto end; + fields |= MQTT_FN_BIT_WILDCARD_SUBSCRIPTION_AVAILABLE; + break; + + case MQTT_PROP_SUBSCRIPTION_IDENTIFIERS_AVAILABLE: + if (fields & MQTT_FN_BIT_SUBSCRIPTION_IDENTIFIER) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.subscription_identifiers_available); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connack.var_hdr.props.subscription_identifiers_available > 1) + goto end; + fields |= MQTT_FN_BIT_SUBSCRIPTION_IDENTIFIER; + break; + + case MQTT_PROP_SHARED_SUBSRIPTION_AVAILABLE: + if (fields & MQTT_FN_BIT_SHARED_SUBSCRIPTION_AVAILABLE) + goto end; + props = mqtt_read_1byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.shared_subsription_available); + /* can have only 2 values: 0 or 1 */ + if (mpkt->data.connack.var_hdr.props.shared_subsription_available > 1) + goto end; + fields |= MQTT_FN_BIT_SHARED_SUBSCRIPTION_AVAILABLE; + break; + + case MQTT_PROP_SERVER_KEEPALIVE: + if (fields & MQTT_FN_BIT_SERVER_KEEPALIVE) + goto end; + props = mqtt_read_2byte_int(istnext(props), &mpkt->data.connack.var_hdr.props.server_keepalive); + fields |= MQTT_FN_BIT_SERVER_KEEPALIVE; + break; + + case MQTT_PROP_RESPONSE_INFORMATION: + if (fields & MQTT_FN_BIT_RESPONSE_INFORMATION) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connack.var_hdr.props.response_information); + fields |= MQTT_FN_BIT_RESPONSE_INFORMATION; + break; + + case MQTT_PROP_SERVER_REFERENCE: + if (fields & MQTT_FN_BIT_SERVER_REFERENCE) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connack.var_hdr.props.server_reference); + fields |= MQTT_FN_BIT_SERVER_REFERENCE; + break; + + case MQTT_PROP_USER_PROPERTIES: + /* if we reached MQTT_PROP_USER_PROPERTY_ENTRIES already, then + * we start writing over the first property */ + if (user_prop_idx >= MQTT_PROP_USER_PROPERTY_ENTRIES) + user_prop_idx = 0; + + /* read user property name and value */ + props = mqtt_read_string(istnext(props), &mpkt->data.connack.var_hdr.props.user_props[user_prop_idx].name); + if (!isttest(props)) + goto end; + props = mqtt_read_string(props, &mpkt->data.connack.var_hdr.props.user_props[user_prop_idx].value); + ++user_prop_idx; + break; + + case MQTT_PROP_AUTHENTICATION_METHOD: + if (fields & MQTT_FN_BIT_AUTHENTICATION_METHOD) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connack.var_hdr.props.authentication_method); + fields |= MQTT_FN_BIT_AUTHENTICATION_METHOD; + break; + + case MQTT_PROP_AUTHENTICATION_DATA: + if (fields & MQTT_FN_BIT_AUTHENTICATION_DATA) + goto end; + props = mqtt_read_string(istnext(props), &mpkt->data.connack.var_hdr.props.authentication_data); + fields |= MQTT_FN_BIT_AUTHENTICATION_DATA; + break; + + default: + return 0; + } + + if (!isttest(props)) + goto end; + } + } + + if ((orig_len - istlen(parser)) == mpkt->fixed_hdr.remaining_length) + ret = MQTT_VALID_MESSAGE; + end: + return ret; +} + + +/* Parses and validates a MQTT packet + * https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718028 + * + * For now, due to HAProxy limitation, only validation of CONNECT and CONNACK packets + * are supported. + * + * - check FIXED_HDR + * - check remaining length + * - check variable headers and payload + * + * if is not NULL, then this structure will be filled up as well. An + * unsupported packet type is considered as invalid. It is not a problem for now + * because only the first packet on each side can be parsed (CONNECT for the + * client and CONNACK for the server). + * + * Returns: + * MQTT_INVALID_MESSAGE if the message is invalid + * MQTT_NEED_MORE_DATA if we need more data to fully validate the message + * MQTT_VALID_MESSAGE if the message looks valid + */ +int mqtt_validate_message(const struct ist msg, struct mqtt_pkt *mpkt) +{ + struct ist parser; + struct mqtt_pkt tmp_mpkt; + int ret = MQTT_INVALID_MESSAGE; + + if (!mpkt) + mpkt = &tmp_mpkt; + memset(mpkt, 0, sizeof(*mpkt)); + + parser = msg; + if (istlen(msg) < MQTT_MIN_PKT_SIZE) { + ret = MQTT_NEED_MORE_DATA; + goto end; + } + + /* parse the MQTT fixed header */ + parser = mqtt_read_fixed_hdr(parser, mpkt); + if (!isttest(parser)) { + ret = MQTT_INVALID_MESSAGE; + goto end; + } + + /* Now parsing "remaining length" field */ + parser = mqtt_read_varint(parser, &mpkt->fixed_hdr.remaining_length); + if (!isttest(parser)) { + ret = MQTT_INVALID_MESSAGE; + goto end; + } + + if (istlen(parser) < mpkt->fixed_hdr.remaining_length) + return MQTT_NEED_MORE_DATA; + + /* Now parsing the variable header and payload, which is based on the packet type */ + switch (mpkt->fixed_hdr.type) { + case MQTT_CPT_CONNECT: + ret = mqtt_parse_connect(parser, mpkt); + break; + case MQTT_CPT_CONNACK: + ret = mqtt_parse_connack(parser, mpkt); + break; + default: + break; + } + + end: + return ret; +} diff --git a/src/sample.c b/src/sample.c index 7c5951791..a9663bbfb 100644 --- a/src/sample.c +++ b/src/sample.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -3296,6 +3297,105 @@ static int sample_conv_fix_is_valid(const struct arg *arg_p, struct sample *smp, return 0; } +/* + * Extract the field value of an input binary sample containing an MQTT packet. + * Takes 2 mandatory arguments: + * - packet type + * - field name + * + * return 1 if the field was found, 0 if not. + */ +static int sample_conv_mqtt_field_value(const struct arg *arg_p, struct sample *smp, void *private) +{ + struct ist pkt, value; + int type, fieldname_id; + + pkt = ist2(smp->data.u.str.area, smp->data.u.str.data); + type = arg_p[0].data.sint; + fieldname_id = arg_p[1].data.sint; + + smp->flags &= ~SMP_F_MAY_CHANGE; + value = mqtt_field_value(pkt, type, fieldname_id); + if (!istlen(value)) { + if (isttest(value)) { + /* value != IST_NULL, need more data */ + smp->flags |= SMP_F_MAY_CHANGE; + } + return 0; + } + + smp->data.u.str = ist2buf(value); + smp->flags |= SMP_F_CONST; + return 1; +} + +/* + * this function checks the "mqtt_field_value" converter configuration. + * It expects a known packet type name or ID and a field name, in this order + * + * Args[0] will be turned into a MQTT_CPT_* value for direct maching when parsing + * a packet. + */ +static int sample_conv_mqtt_field_value_check(struct arg *args, struct sample_conv *conv, + const char *file, int line, char **err) +{ + int type, fieldname_id; + + /* check the MQTT packet type is valid */ + type = mqtt_typeid(ist2(args[0].data.str.area, args[0].data.str.data)); + if (type == MQTT_CPT_INVALID) { + memprintf(err, "Unknown MQTT type '%s'", args[0].data.str.area); + return 0; + } + + /* check the field name belongs to the MQTT packet type */ + fieldname_id = mqtt_check_type_fieldname(type, ist2(args[1].data.str.area, args[1].data.str.data)); + if (fieldname_id == MQTT_FN_INVALID) { + memprintf(err, "Unknown MQTT field name '%s' for packet type '%s'", args[1].data.str.area, + args[0].data.str.area); + return 0; + } + + /* save numeric counterparts of type and field name */ + chunk_destroy(&args[0].data.str); + chunk_destroy(&args[1].data.str); + args[0].type = ARGT_SINT; + args[0].data.sint = type; + args[1].type = ARGT_SINT; + args[1].data.sint = fieldname_id; + + return 1; +} + +/* + * Checks that contains a valid MQTT message + * + * The function returns 1 if the check was run to its end, 0 otherwise. + * The result of the analyse itself is stored in as a boolean. + */ +static int sample_conv_mqtt_is_valid(const struct arg *arg_p, struct sample *smp, void *private) +{ + struct ist msg; + + msg = ist2(smp->data.u.str.area, smp->data.u.str.data); + + smp->flags &= ~SMP_F_MAY_CHANGE; + switch (mqtt_validate_message(msg, NULL)) { + case FIX_VALID_MESSAGE: + smp->data.type = SMP_T_BOOL; + smp->data.u.sint = 1; + return 1; + case FIX_NEED_MORE_DATA: + smp->flags |= SMP_F_MAY_CHANGE; + return 0; + case FIX_INVALID_MESSAGE: + smp->data.type = SMP_T_BOOL; + smp->data.u.sint = 0; + return 1; + } + return 0; +} + /* This function checks the "strcmp" converter's arguments and extracts the * variable name and its scope. */ @@ -3888,6 +3988,10 @@ static struct sample_conv_kw_list sample_conv_kws = {ILH, { { "fix_is_valid", sample_conv_fix_is_valid, 0, NULL, SMP_T_BIN, SMP_T_BOOL }, { "fix_tag_value", sample_conv_fix_tag_value, ARG1(1,STR), sample_conv_fix_value_check, SMP_T_BIN, SMP_T_BIN }, + /* MQTT converters */ + { "mqtt_is_valid", sample_conv_mqtt_is_valid, 0, NULL, SMP_T_BIN, SMP_T_BOOL }, + { "mqtt_field_value", sample_conv_mqtt_field_value, ARG2(2,STR,STR), sample_conv_mqtt_field_value_check, SMP_T_BIN, SMP_T_STR }, + { "iif", sample_conv_iif, ARG2(2, STR, STR), NULL, SMP_T_BOOL, SMP_T_STR }, { "and", sample_conv_binary_and, ARG1(1,STR), check_operator, SMP_T_SINT, SMP_T_SINT },