Confluent .net (rdkafka) коммит на потребительской утилизации

У слитного продвинутого потребителя здесь есть следующий код (сокращенный для краткости).

        using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
        {
            while (!cancelled)
            {
                Message<Null, string> msg;
                if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
                {
                    continue;
                }                    

                if (msg.Offset % 5 == 0)
                {                        
                    consumer.CommitAsync(msg).Result;                        
                }
            }
        }

Авто-фиксация ложна. Мой вопрос заключается в том, что произойдет, если триггер "отменен" помечен, в то время как еще не завершены коммиты. Сообщения оставлены незафиксированными и, следовательно, будут получены снова? Я надеялся, что потребитель согласится на утилизацию, но я не вижу ничего подобного в реализации. Я могу сделать несколько тестов, чтобы увидеть, что происходит, но я надеялся на "официальный" ответ, если мои тесты не охватывают все случаи.

1 ответ

Первое примечание, вы должны использовать CommitAsync только когда enable.auto.commit установлен в false (это имеет место в примере Продвинутого потребителя - для этого нужны дополнительные комментарии и ввод вики). Если вы используете автоматическую фиксацию (по умолчанию), вы будете автоматически фиксировать при утилизации (это на стороне librdkafka при вызове rd_kafka_destroy)

Здесь нас ждут CommitAsync завершить (через .Result) и он вызывается в том же потоке, что и цикл while, поэтому повторные запросы не будут выполняться.

Если вы используете ручную фиксацию (это имеет место в полном фрагменте кода), вам придется фиксировать вручную, прежде чем утилизировать - это действительно отсутствует в примере, добавит его. Как уже говорилось, при ручной фиксации вы должны фиксировать себя (есть случаи, когда пользователь не захочет фиксировать при утилизации)

Я также только что обнаружил ошибку, если вы вызываете CommitAsync, но не ждете его перед удалением, возможно, у вас застрял поток или возникла исключительная ситуация accessViolationException, которая исправит это ( https://github.com/confluentinc/confluent-kafka-dotnet/issues/279) Таким образом, вы должны всегда ждать завершения CommitAsync, прежде чем утилизировать

Другие вопросы по тегам