Некоторые потребители Python Confluent Kafka остаются простаивающими / неназначенными, хотя другие перегружены / переназначены
Настроить:
- 120 потребителей python confluent-kafka, которые подписываются на один и тот же набор тем.
- 8 тем с разным количеством разделов: 1 тема с 84 разделами, несколько тем с 40-50 разделами и остальные с 1-10 разделами. Общее количество перегородок около 300.
Я использую довольно стандартный код подписки:
def __init__(self, kafka_broker_list: str, group_id: str, topics: List[str]):
from confluent_kafka import Consumer
self._consumer = Consumer({
'bootstrap.servers': kafka_broker_list,
'fetch.max.bytes': 50 * 1024 * 1024, # 50MB
'auto.offset.reset': 'earliest',
'group.id': group_id,
'enable.auto.commit': True
})
logging.info(f"Subscribing for topics: {topics}")
self._consumer.subscribe(topics, on_assign=self._on_assign, on_revoke=self._on_revoke)
Проблема: из 120 потребителей, которые я запускаю, только 84 (то же количество, что и количество разделов самой большой темы) получают назначение разделов - остальные остаются без назначения разделов и, таким образом, остаются простаивающими. Что еще хуже, я обычно получаю 5 потребителей с ~ 10 назначенными разделами, некоторые с 8, много с 2-3-4, а также много потребителей с назначенным только одним разделом. Я считаю, что "первые" потребители, которые подписываются, получают наибольшее количество тем, пока не будут исчерпаны доступные разделы для каждой темы.
Вопросы:
- Я читал о
partition.assignment.strategy
свойство конфигурации, которое доступно для пользователей Java, однако я не смог найти его в Confluent Kafka Client. Итак, есть ли способ настроить стратегию назначения в Confluent Kafka Python Client? - Есть ли способ установить стратегию назначения разделов на сервере, по теме или по идентификатору группы?
- Альтернативно Есть ли другой способ распределить нагрузку между всеми потребителями?
Спасибо, что нашли время прочитать мой вопрос:)
2 ответа
Клиент python confluent-kafka внутренне использует библиотеку librdkafka, которая фактически позволяет настраивать стратегию присваивания. В настоящее время поддерживаются две стратегии присваивания: "диапазон" (по умолчанию) и "раундробин", который решает описанную мной проблему.
Он настраивается путем добавления следующего свойства конфигурации в конфигурацию потребителя:
'partition.assignment.strategy': 'roundrobin',
Документация по всем свойствам librdkafka доступна здесь:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Теперь вы можете использовать кооперативную стратегию перебалансировки для повышения производительности:
'partition.assignment.strategy': 'cooperativesticky'