Непредсказуемое поведение Paho C++ при отключении сети

Я реализую модуль постоянства с использованием Paho C++, который удаляет запись из базы данных только после того, как соответствующие данные были успешно опубликованы в облаке. Таким образом, вызов удаления в базу данных запускается из обратного вызова on_success для публикации. Неполный отредактированный фрагмент кода ниже:

void mqtt_module::start() {

    connect();

    while (!kill_flag) {
        if(client->is_connected()) {
        records  = read_persistance_database();
            if (records.size() > 0) {
                for (std::vector<record>::iterator it = records.begin(); it != records.end(); ++it) {
                    try {
                        int* rowid;
                        rowid = new int;
                        *rowid = it->rowid;
                        auto pubmsg = mqtt::make_message(it->topic, raw_data);
                        pubmsg->set_qos(it->qos);
                        logger->debug("[{}] : [MQTT] publish to topic {} message {}", LOG_TAG,it->topic,raw_data);
                        mqtt::delivery_token_ptr pubtok = client->publish(pubmsg, static_cast<void*>(rowid), *this);
                        bool publish_success = pubtok->wait_for(15000); //keeping this higher than connection-timeout+keep-alive so that connection timeout happens before publish timeout
                        if(!publish_success)
                        {
                            break;//Read the records again
                        }

                    } catch (const mqtt::exception& exc) {
                        break;  //If publish fails, break and read the DB again. this way messages are not sent out of sequence
                    }
                }
                records.clear();
            }
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    }
}

void mqtt_module::on_success(const mqtt::token& tok) {
    int* rowid;
    rowid = static_cast<int*>(tok.get_user_context());
    auto top = tok.get_topics();

    delete_record_from_database(*rowid);
}


bool mqtt_module::connect() {
    //callback cb;
    client->set_callback(*this);

    mqtt::connect_options connopts(config->mqtt_username, config->mqtt_password);

    connopts.set_keep_alive_interval(3);
    connopts.set_connect_timeout(1);
    connopts.set_automatic_reconnect(true);
    connopts.set_clean_session(false);

    mqtt::ssl_options sslopts;
    sslopts.set_trust_store(config->mqtt_ca_path);

    mqtt::message willmsg(LWT_TOPIC, LWT_PAYLOAD, QOS, true);
    mqtt::will_options will(willmsg);

    connopts.set_will(will);
    connopts.set_ssl(sslopts);

    try {
        mqtt::token_ptr conntok = client->connect(connopts);
        conntok->wait();
    } catch (const mqtt::exception& exc) {
        logger->debug(exc.what());
        return false;
    }

    return true;
}

void mqtt_module::connection_lost(const string& cause) {

}

void mqtt_module::connected(const std::string& cause) {

}

void mqtt_module::delivery_complete(mqtt::delivery_token_ptr tok) {

}

Когда я пытаюсь смоделировать проблему с сетью, отключив сетевой интерфейс, paho попытается опубликовать сообщение (QOS 2) в течение нескольких секунд, пока не произойдет отключение MQTT. Иногда этот вызов публикации запускает обратный вызов on_success, даже если полезная нагрузка не достигает облачного брокера. И когда-то я видел противоположное поведение, когда обратный вызов on_success не вызывается, даже когда полезная нагрузка достигла облачного брокера. Может ли кто-нибудь помочь мне оптимизировать это таким образом, чтобы не было потери или дублирования данных?

0 ответов

Другие вопросы по тегам