Непредсказуемое поведение Paho C++ при отключении сети
Я реализую модуль постоянства с использованием Paho C++, который удаляет запись из базы данных только после того, как соответствующие данные были успешно опубликованы в облаке. Таким образом, вызов удаления в базу данных запускается из обратного вызова on_success для публикации. Неполный отредактированный фрагмент кода ниже:
void mqtt_module::start() {
connect();
while (!kill_flag) {
if(client->is_connected()) {
records = read_persistance_database();
if (records.size() > 0) {
for (std::vector<record>::iterator it = records.begin(); it != records.end(); ++it) {
try {
int* rowid;
rowid = new int;
*rowid = it->rowid;
auto pubmsg = mqtt::make_message(it->topic, raw_data);
pubmsg->set_qos(it->qos);
logger->debug("[{}] : [MQTT] publish to topic {} message {}", LOG_TAG,it->topic,raw_data);
mqtt::delivery_token_ptr pubtok = client->publish(pubmsg, static_cast<void*>(rowid), *this);
bool publish_success = pubtok->wait_for(15000); //keeping this higher than connection-timeout+keep-alive so that connection timeout happens before publish timeout
if(!publish_success)
{
break;//Read the records again
}
} catch (const mqtt::exception& exc) {
break; //If publish fails, break and read the DB again. this way messages are not sent out of sequence
}
}
records.clear();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}
}
void mqtt_module::on_success(const mqtt::token& tok) {
int* rowid;
rowid = static_cast<int*>(tok.get_user_context());
auto top = tok.get_topics();
delete_record_from_database(*rowid);
}
bool mqtt_module::connect() {
//callback cb;
client->set_callback(*this);
mqtt::connect_options connopts(config->mqtt_username, config->mqtt_password);
connopts.set_keep_alive_interval(3);
connopts.set_connect_timeout(1);
connopts.set_automatic_reconnect(true);
connopts.set_clean_session(false);
mqtt::ssl_options sslopts;
sslopts.set_trust_store(config->mqtt_ca_path);
mqtt::message willmsg(LWT_TOPIC, LWT_PAYLOAD, QOS, true);
mqtt::will_options will(willmsg);
connopts.set_will(will);
connopts.set_ssl(sslopts);
try {
mqtt::token_ptr conntok = client->connect(connopts);
conntok->wait();
} catch (const mqtt::exception& exc) {
logger->debug(exc.what());
return false;
}
return true;
}
void mqtt_module::connection_lost(const string& cause) {
}
void mqtt_module::connected(const std::string& cause) {
}
void mqtt_module::delivery_complete(mqtt::delivery_token_ptr tok) {
}
Когда я пытаюсь смоделировать проблему с сетью, отключив сетевой интерфейс, paho попытается опубликовать сообщение (QOS 2) в течение нескольких секунд, пока не произойдет отключение MQTT. Иногда этот вызов публикации запускает обратный вызов on_success, даже если полезная нагрузка не достигает облачного брокера. И когда-то я видел противоположное поведение, когда обратный вызов on_success не вызывается, даже когда полезная нагрузка достигла облачного брокера. Может ли кто-нибудь помочь мне оптимизировать это таким образом, чтобы не было потери или дублирования данных?