Как создать тему кафки, используя пакет сообщений или буфер с pykafka
Как создать тему кафки используя пакет сообщений или буфер с пикафкой. Я имею в виду, что один производитель может создать много сообщений в одном процессе. Я знаю концепцию, используя сообщение пакетного или буферного сообщения, но я не знаю, как его реализовать. Я надеюсь, что кто-то может помочь мне здесь
2 ответа
PyKafka прозрачно обрабатывает пакетирование сообщения в производителе - вам не нужно делать ничего особенного, чтобы убедиться, что сообщения генерируются партиями. Producer
Класс предлагает кучу опций конфигурации, чтобы позволить вам настроить поведение пакетирования. Полный список этих параметров доступен в документации, но некоторые из наиболее важных:
max_queued_messages
- когда тыproduce()
D больше сообщений, чем это, отправьте партию немедленноmin_queued_messages
- когда тыproduce()
буду хотя бы столько сообщений отправлять партиюlinger_ms
- если с момента последней партии прошло столько времени, отправьте партию
Просто используйте send()
метод. Вам не нужно управлять этим самостоятельно.
send () является асинхронным. При вызове он добавляет запись в буфер ожидающих отправки записей и сразу же возвращает. Это позволяет производителю объединять отдельные записи для повышения эффективности.
Ваша задача состоит только в том, чтобы настроить два подпорки об этом: batch_size и linger_ms.
Производитель поддерживает буферы неотправленных записей для каждого раздела. Эти буферы имеют размер, указанный в конфигурации "batch_size". Увеличение этого размера может привести к увеличению пакетной обработки, но требует больше памяти (поскольку у нас обычно будет один из этих буферов для каждого активного раздела).
Два подпорки будут сделаны способом ниже:
как только мы получим batch_size записей для раздела, он будет отправлен немедленно, независимо от этого параметра, однако, если у нас будет накоплено меньше этого количества байтов для этого раздела, мы будем "задерживаться" в течение указанного времени, ожидая появления новых записей.