Возврат от потребителя Кафки, когда нет сообщения
Я хочу обработать тему при запуске приложения, используя клиент Confluent dotnet. Предположим следующий пример:
while (true)
{
try
{
var cr = c.Consume();
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
Если в Kafka нет нового сообщения, c.Consume будет заблокирован. Поскольку я хочу использовать его для запуска приложения (например, для разогрева кэша), я хочу продолжить свой код, когда обнаружил, что нового сообщения нет.
Я знаю, что есть перегрузка для установки тайм-аута, как c.Consume(timeout)
но проблема с этим подходом состоит в том, что если у вас есть сообщение в вашей теме, и длительность чтения сообщения была больше, чем ваш тайм-аут, вы получите нулевой вывод, что нежелательно.
3 ответа
Потребитель (и) не должен знать о производителе (ах).
Теперь, если вы хотите знать, что вы прочитали все в теме с момента, когда вы начали потреблять, вы можете:
- Загрузите новейшее смещение перед началом потребления.
- Тогда начните потреблять сообщения.
- Если смещение сообщения совпадает с последним смещением, которое вы загрузили ранее, прекратите потребление.
я не C#
developper, но из того, что я прочитал в DotNet Confluent Doc вы можете позвонить QueryWatermarkOffsets
на потребителя, чтобы получить самую старую и новейшую компенсацию. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Consumer.html
А потом, на Message
класс у вас есть Offset
сбруя. Таким образом, все это не должно быть слишком сложным для достижения. https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Message.html
Ты можешь использовать OnPartitionEOF
событие, которое указывает, что вы достигли конца раздела.
CancellationTokenSource source = new CancellationTokenSource();
bool isContinue = true;
c.OnPartitionEOF += (o, e) =>
{
Console.WriteLine($"You have reached end of partition");
isContinue = false;
source.Cancel();
};
while (isContinue)
{
try
{
var cr = c.Consume(source.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}