Возврат от потребителя Кафки, когда нет сообщения

Я хочу обработать тему при запуске приложения, используя клиент Confluent dotnet. Предположим следующий пример:

    while (true)
    {
        try
        {
            var cr = c.Consume();
            Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
        }
        catch (ConsumeException e)
        {
            Console.WriteLine($"Error occured: {e.Error.Reason}");
        }
    }

Если в Kafka нет нового сообщения, c.Consume будет заблокирован. Поскольку я хочу использовать его для запуска приложения (например, для разогрева кэша), я хочу продолжить свой код, когда обнаружил, что нового сообщения нет.

Я знаю, что есть перегрузка для установки тайм-аута, как c.Consume(timeout) но проблема с этим подходом состоит в том, что если у вас есть сообщение в вашей теме, и длительность чтения сообщения была больше, чем ваш тайм-аут, вы получите нулевой вывод, что нежелательно.

3 ответа

Решение

Потребитель (и) не должен знать о производителе (ах).

Теперь, если вы хотите знать, что вы прочитали все в теме с момента, когда вы начали потреблять, вы можете:

  1. Загрузите новейшее смещение перед началом потребления.
  2. Тогда начните потреблять сообщения.
  3. Если смещение сообщения совпадает с последним смещением, которое вы загрузили ранее, прекратите потребление.

я не C# developper, но из того, что я прочитал в DotNet Confluent Doc вы можете позвонить QueryWatermarkOffsetsна потребителя, чтобы получить самую старую и новейшую компенсацию. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html

А потом, на Messageкласс у вас есть Offset сбруя. Таким образом, все это не должно быть слишком сложным для достижения. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html

Ты можешь использовать OnPartitionEOF событие, которое указывает, что вы достигли конца раздела.

CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;

c.OnPartitionEOF += (o, e) =>
    {
        Console.WriteLine($"You have reached end of partition");
        isContinue = false;
        source.Cancel();
    };    
while (isContinue)
{
    try
    {
        var cr = c.Consume(source.Token);
        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
    }
    catch (ConsumeException e)
    {
        Console.WriteLine($"Error occured: {e.Error.Reason}");
    }
}

Я нашел Consumer.IsPartitionEOF полезным.

Другие вопросы по тегам