Плохое чтение/запись сокета на клиенте для LwIP, MQTT, STM32 и RTOS

Цель:

  • Публиковать и подписываться на темы, чтобы я мог получать и отправлять данные с микроконтроллера STM32F4 по MQTT, используя: среду STM32F4 Hal, LwIP, FreeRTOS и ENC8742 IC (RMII)
  • Я использую плату разработки Nucleo-F429ZI.

Проблема:

Когда я звоню example_do_connect(client, &mqttServerIP);(см. файл lwip_mqtt.c ниже) Я получаю следующую ошибку в моем брокере MQTT, работающем под Windows:

      1643926497: New connection from 192.168.1.101:52432 on port 1883.
1643926497: New client connected from 192.168.1.101:52432 as STM32-nucleo (p2, c1, k0).
1643926497: No will message specified.
1643926497: Sending CONNACK to STM32-nucleo (0, 2)
1643926497: Bad socket read/write on client STM32-nucleo: Invalid arguments provided.

( example_publish(client, mqtt_msg);ничего не делает, но я ожидаю, что это связано с рассматриваемой проблемой).

Что я сделал:

  • Установите Mosquitto и запустите mosquitto -v -c .\mqtt_allowAnonym.confгде файлы .conf содержат: listener 1883и allow_anonymous true

  • Создайте проект с помощью CubeMX. CubeMX настроен с использованием Ethernet, FreeRTOS и LwIP. Это серия видеороликов, по которым я следил за настройкой LwIP и Ethernet: ссылка

  • FreeRTOS была настроена в конфигурации по умолчанию с CMSISV2 и heap4.

  • Для компиляции и отладки проекта использовались Visual Studio и Visual GDB.

  • Создал файлы lwip_mqtt.h и .c из этого источника: (ссылочный код здесь)

Код:

Поток Ethernet:

      /* Ethernet_thread.c */

// Includes: ----------------------------------------------------------
#include "EthernetLwIP_thread.h"
#include "config.h"


// Instantiations: ----------------------------------------------------

// Thread declaration: ------------------------------------------------
osThreadId_t EthernetModule_ThreadId;
const osThreadAttr_t EthernetModule_Attributes = {
    .name = "EthernetModule",
    .priority = (osPriority_t) osPriorityHigh,
    .stack_size = 1024 * 4                                                  // This needs to be optimized at a later stage
};
void EthernetModule_Thread(void *argument);

// Functions: --------------------------------------------------------

// Initializing functions: ---------------------------------------------------------------------
MResult EthernetModule_HardwareInit()
{
    return Result_Successful;
}

MResult EthernetModule_Init() {
    MX_LWIP_Init();              // This had to be moved to here, because CubeMX auto generated this to be located in the default thread, but if this line is left there the RTOS threads dont operate as expected. 

    if (EthernetModule_HardwareInit() == Result_Error)
        return Result_Error;

    EthernetModule_ThreadId = osThreadNew(EthernetModule_Thread, NULL, &EthernetModule_Attributes);
    if (EthernetModule_ThreadId == NULL)
        return Result_Error;
    
    return Result_Successful;
}

// Thread: ---------------------------------------------------------------------------------------
void EthernetModule_Thread(void *argument)
{
    // Task timing
    TickType_t xLastWakeTime;
    xLastWakeTime = xTaskGetTickCount();
    const TickType_t xDelay_Ticks = 500; // Current RTOS clock config has 1 tick equal 1ms. Thus 20 ticks sets a frequency of 50Hz

    mqtt_client_t *client;
    client = mqtt_client_new();

    ip_addr_t mqttServerIP;
    IP4_ADDR(&mqttServerIP, 192, 168, 1, 2);
    char topicName[] = "LwIP_FreeRTOS_MQTT_Test";

    if (client != NULL)
    {
        char mqtt_msg[1000];
        sprintf(mqtt_msg, "hello_pc");
        example_do_connect(client, &mqttServerIP);
        example_publish(client, mqtt_msg);
    }


    uint32_t someVariable;
    while (1)
    {   
        vTaskDelayUntil(&xLastWakeTime, xDelay_Ticks); // waits until a certain number of ticks have passed before it starts this task again in a timely manner
        
        HAL_GPIO_TogglePin(LED_GREEN_PORT, LED_GREEN_PIN);
        // Memory debug API's
        //size_t freeMinHeapSize = xPortGetMinimumEverFreeHeapSize();
        //HeapStats_t myHeapStats;
        //vPortGetHeapStats(&myHeapStats);
    }
}

lwip_mqtt.h:

      #ifndef LWIP_MQTT_H
#define LWIP_MQTT_H

#include "mqtt.h"

static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len);
static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags);
static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status);
static void mqtt_sub_request_cb(void *arg, err_t result);
static void mqtt_pub_request_cb(void *arg, err_t result);
void example_do_connect(mqtt_client_t *client, const ip_addr_t *ip_addr);
void example_publish(mqtt_client_t *client, void *arg);

#endif //LWIP_MQTT_H

lwip_mqtt.c

      #include "lwip/apps/mqtt.h"
#include <string.h>
#include "stm32f4xx_hal.h"

/* Notes:
 * Connection to server can also be probed by calling mqtt_client_is_connected(client)
 * To disconnect, simply call mqtt_disconnect(client)
 * 
*/



static void mqtt_sub_request_cb(void *arg, err_t result)
{
    /* Just print the result code here for simplicity, 
       normal behaviour would be to take some action if subscribe fails like 
       notifying user, retry subscribe or disconnect from server */
    printf("Subscribe result: %d\n", result);
}

/* Called when publish is complete either with sucess or failure */
static void mqtt_pub_request_cb(void *arg, err_t result)
{
    if (result != ERR_OK) {
        printf("Publish result: %d\n", result);
    }
}

/* The idea is to demultiplex topic and create some reference to be used in data callbacks
   Example here uses a global variable, better would be to use a member in arg
   If RAM and CPU budget allows it, the easiest implementation might be to just take a copy of
   the topic string and use it in mqtt_incoming_data_cb
*/
static int inpub_id;
static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
{
    printf("Incoming publish at topic %s with total length %u\n", topic, (unsigned int)tot_len);

    /* Decode topic string into a user defined reference */
    if (strcmp(topic, "print_payload") == 0) {
        inpub_id = 0;
    }
    else if (topic[0] == 'A') {
        /* All topics starting with 'A' might be handled at the same way */
        inpub_id = 1;
    }
    else {
        /* For all other topics */
        inpub_id = 2;
    }
}

static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
{
    printf("Incoming publish payload with length %d, flags %u\n", len, (unsigned int)flags);

    if (flags & MQTT_DATA_FLAG_LAST) {
        /* Last fragment of payload received (or whole part if payload fits receive buffer
           See MQTT_VAR_HEADER_BUFFER_LEN)  */

        /* Call function or do action depending on reference, in this case inpub_id */
        if (inpub_id == 0) {
            /* Don't trust the publisher, check zero termination */
            if (data[len - 1] == 0) {
                printf("mqtt_incoming_data_cb: %s\n", (const char *)data);
            }
        }
        else if (inpub_id == 1) {
            /* Call an 'A' function... */
        }
        else {
            printf("mqtt_incoming_data_cb: Ignoring payload...\n");
        }
    }
    else {
        /* Handle fragmented payload, store in buffer, write to file or whatever */
    }
}

static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
{
    err_t err;
    if (status == MQTT_CONNECT_ACCEPTED) {
        printf("mqtt_connection_cb: Successfully connected\n");
    
        /* Setup callback for incoming publish requests */
        mqtt_set_inpub_callback(client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, arg);
    
        /* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */ 
        err = mqtt_subscribe(client, "subtopic", 1, mqtt_sub_request_cb, arg);

        if (err != ERR_OK) {
            printf("mqtt_subscribe return: %d\n", err);
        }
    }
    else {
        printf("mqtt_connection_cb: Disconnected, reason: %d\n", status);
    
        /* Its more nice to be connected, so try to reconnect */
        //example_do_connect(client);
    }  
}

void example_do_connect(mqtt_client_t *client, const ip_addr_t *ip_addr)
{
  struct mqtt_connect_client_info_t ci;
  err_t err;
  
  /* Setup an empty client info structure */
  memset(&ci, 0, sizeof(ci));
  
  /* Minimal amount of information required is client identifier, so set it here */ 
  ci.client_id = "STM32-nucleo";
  
  /* Initiate client and connect to server, if this fails immediately an error code is returned
     otherwise mqtt_connection_cb will be called with connection result after attempting 
     to establish a connection with the server. 
     For now MQTT version 3.1.1 is always used */
  
  err = mqtt_client_connect(client, ip_addr, MQTT_PORT, mqtt_connection_cb, 0, &ci);
  
  /* For now just print the result code if something goes wrong*/
  if(err != ERR_OK) {
    printf("mqtt_connect return %d\n", err);
  }
}

void example_publish(mqtt_client_t *client, void *arg)
{
    const char *pub_payload = "PubSubHubLubJub";
    err_t err;
    u8_t qos = 2; /* 0 1 or 2, see MQTT specification */
    u8_t retain = 0; /* No don't retain such crappy payload... */
    err = mqtt_publish(client, "pub_topic", pub_payload, strlen(pub_payload), qos, retain, mqtt_pub_request_cb, arg);
    if (err != ERR_OK) {
        printf("Publish err: %d\n", err);
    }
}

0 ответов

Другие вопросы по тегам