Чтение только конкретных сообщений из темы кафки
Сценарий:
Я записываю данные объекта JSON в тему kafka во время чтения. Я хочу прочитать только определенный набор сообщений, основанный на значении, присутствующем в сообщении. Я использую библиотеку kafka-python.
примеры сообщений:
{flow_status: "completed", value: 1, active: yes}
{flow_status:"failure",value 2, active:yes}
Здесь я хочу читать только сообщения, имеющие flow_Status, как завершено.
3 ответа
В Кафке не возможно делать что-то подобное. Потребитель потребляет сообщения одно за другим, одно за другим, начиная с самого последнего принятого смещения (или с начала, или просматривая с определенным смещением). Зависит от вашего варианта использования, возможно, у вас может быть другой поток в вашем сценарии: сообщение с процессом, выполняемым для выполнения, переходит в тему, но затем приложение, которое обрабатывает действие, затем записывает результат (выполненный или неудачный) в двух разных темах.: таким образом у вас все выполнено отделено от неудачного. Другим способом является использование приложения Kafka Streams для выполнения фильтрации, но с учетом того, что это всего лишь сахар, в действительности приложение потоков всегда будет читать все сообщения, но позволяет легко фильтровать сообщения.
Вы можете создать две разные темы; один для выполненного и другой для статуса отказа. А затем прочитайте сообщения из законченных тем, чтобы справиться с ними.
В противном случае, если вы хотите, чтобы они были в одной теме и хотели читать только завершенные, я считаю, что вам нужно прочитать их все и игнорировать сбойные, используя простое условие if-else.
Потребитель Kafka не поддерживает такую функциональность заранее. Вы должны будете использовать все события последовательно, отфильтровать статус завершенных событий и поместить его куда-нибудь. Вместо этого вы можете рассмотреть возможность использования приложения Kafka Streams, где вы можете прочитать данные в виде потока и отфильтровать события, где flow_status = "complete", и опубликовать их в какой-либо теме вывода или в другом месте назначения.
Пример:
KStream<String,JsonNode> inputStream= builder.stream(inputTopic);
KStream<String,JsonNode> completedFlowStream = inputStream.filter(value-> value.get("flow_status").equals("completed"));
PS Кафка не имеет официального релиза для Python API для KStream, но есть проект с открытым исходным кодом: https://github.com/wintoncode/winton-kafka-streams
На сегодняшний день невозможно достичь этого на стороне брокера, есть запрос функции Jira, открытый для apache kafka, чтобы реализовать эту функцию, вы можете отслеживать его здесь, я надеюсь, что они будут реализованы в ближайшем будущем:https://issues.apache.org/jira/browse/KAFKA-6020
Я считаю, что лучший способ - использовать интерфейс RecordFilterStrategy (Java/spring) и фильтровать его на стороне потребителя.