Чтение только конкретных сообщений из темы кафки

Сценарий:

Я записываю данные объекта JSON в тему kafka во время чтения. Я хочу прочитать только определенный набор сообщений, основанный на значении, присутствующем в сообщении. Я использую библиотеку kafka-python.

примеры сообщений:

{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}

Здесь я хочу читать только сообщения, имеющие flow_Status, как завершено.

3 ответа

В Кафке не возможно делать что-то подобное. Потребитель потребляет сообщения одно за другим, одно за другим, начиная с самого последнего принятого смещения (или с начала, или просматривая с определенным смещением). Зависит от вашего варианта использования, возможно, у вас может быть другой поток в вашем сценарии: сообщение с процессом, выполняемым для выполнения, переходит в тему, но затем приложение, которое обрабатывает действие, затем записывает результат (выполненный или неудачный) в двух разных темах.: таким образом у вас все выполнено отделено от неудачного. Другим способом является использование приложения Kafka Streams для выполнения фильтрации, но с учетом того, что это всего лишь сахар, в действительности приложение потоков всегда будет читать все сообщения, но позволяет легко фильтровать сообщения.

Вы можете создать две разные темы; один для выполненного и другой для статуса отказа. А затем прочитайте сообщения из законченных тем, чтобы справиться с ними.

В противном случае, если вы хотите, чтобы они были в одной теме и хотели читать только завершенные, я считаю, что вам нужно прочитать их все и игнорировать сбойные, используя простое условие if-else.

Потребитель Kafka не поддерживает такую ​​функциональность заранее. Вы должны будете использовать все события последовательно, отфильтровать статус завершенных событий и поместить его куда-нибудь. Вместо этого вы можете рассмотреть возможность использования приложения Kafka Streams, где вы можете прочитать данные в виде потока и отфильтровать события, где flow_status = "complete", и опубликовать их в какой-либо теме вывода или в другом месте назначения.

Пример:

KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));

PS Кафка не имеет официального релиза для Python API для KStream, но есть проект с открытым исходным кодом: https://github.com/wintoncode/winton-kafka-streams

На сегодняшний день невозможно достичь этого на стороне брокера, есть запрос функции Jira, открытый для apache kafka, чтобы реализовать эту функцию, вы можете отслеживать его здесь, я надеюсь, что они будут реализованы в ближайшем будущем:https://issues.apache.org/jira/browse/KAFKA-6020

Я считаю, что лучший способ - использовать интерфейс RecordFilterStrategy (Java/spring) и фильтровать его на стороне потребителя.

Другие вопросы по тегам