Confluent .net (rdkafka) коммит на потребительской утилизации
У слитного продвинутого потребителя здесь есть следующий код (сокращенный для краткости).
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
while (!cancelled)
{
Message<Null, string> msg;
if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
if (msg.Offset % 5 == 0)
{
consumer.CommitAsync(msg).Result;
}
}
}
Авто-фиксация ложна. Мой вопрос заключается в том, что произойдет, если триггер "отменен" помечен, в то время как еще не завершены коммиты. Сообщения оставлены незафиксированными и, следовательно, будут получены снова? Я надеялся, что потребитель согласится на утилизацию, но я не вижу ничего подобного в реализации. Я могу сделать несколько тестов, чтобы увидеть, что происходит, но я надеялся на "официальный" ответ, если мои тесты не охватывают все случаи.
1 ответ
Первое примечание, вы должны использовать CommitAsync
только когда enable.auto.commit
установлен в false
(это имеет место в примере Продвинутого потребителя - для этого нужны дополнительные комментарии и ввод вики). Если вы используете автоматическую фиксацию (по умолчанию), вы будете автоматически фиксировать при утилизации (это на стороне librdkafka при вызове rd_kafka_destroy)
Здесь нас ждут CommitAsync
завершить (через .Result
) и он вызывается в том же потоке, что и цикл while, поэтому повторные запросы не будут выполняться.
Если вы используете ручную фиксацию (это имеет место в полном фрагменте кода), вам придется фиксировать вручную, прежде чем утилизировать - это действительно отсутствует в примере, добавит его. Как уже говорилось, при ручной фиксации вы должны фиксировать себя (есть случаи, когда пользователь не захочет фиксировать при утилизации)
Я также только что обнаружил ошибку, если вы вызываете CommitAsync, но не ждете его перед удалением, возможно, у вас застрял поток или возникла исключительная ситуация accessViolationException, которая исправит это ( https://github.com/confluentinc/confluent-kafka-dotnet/issues/279) Таким образом, вы должны всегда ждать завершения CommitAsync, прежде чем утилизировать