Некоторые потребители Python Confluent Kafka остаются простаивающими / неназначенными, хотя другие перегружены / переназначены

Настроить:

  • 120 потребителей python confluent-kafka, которые подписываются на один и тот же набор тем.
  • 8 тем с разным количеством разделов: 1 тема с 84 разделами, несколько тем с 40-50 разделами и остальные с 1-10 разделами. Общее количество перегородок около 300.

Я использую довольно стандартный код подписки:

def __init__(self, kafka_broker_list: str, group_id: str, topics: List[str]):
        from confluent_kafka import Consumer
        self._consumer = Consumer({
            'bootstrap.servers': kafka_broker_list,
            'fetch.max.bytes': 50 * 1024 * 1024,  # 50MB
            'auto.offset.reset': 'earliest',
            'group.id': group_id,
            'enable.auto.commit': True
        })
        logging.info(f"Subscribing for topics: {topics}")
        self._consumer.subscribe(topics, on_assign=self._on_assign, on_revoke=self._on_revoke)

Проблема: из 120 потребителей, которые я запускаю, только 84 (то же количество, что и количество разделов самой большой темы) получают назначение разделов - остальные остаются без назначения разделов и, таким образом, остаются простаивающими. Что еще хуже, я обычно получаю 5 потребителей с ~ 10 назначенными разделами, некоторые с 8, много с 2-3-4, а также много потребителей с назначенным только одним разделом. Я считаю, что "первые" потребители, которые подписываются, получают наибольшее количество тем, пока не будут исчерпаны доступные разделы для каждой темы.

Вопросы:

  1. Я читал о partition.assignment.strategyсвойство конфигурации, которое доступно для пользователей Java, однако я не смог найти его в Confluent Kafka Client. Итак, есть ли способ настроить стратегию назначения в Confluent Kafka Python Client?
  2. Есть ли способ установить стратегию назначения разделов на сервере, по теме или по идентификатору группы?
  3. Альтернативно Есть ли другой способ распределить нагрузку между всеми потребителями?

Спасибо, что нашли время прочитать мой вопрос:)

2 ответа

Решение

Клиент python confluent-kafka внутренне использует библиотеку librdkafka, которая фактически позволяет настраивать стратегию присваивания. В настоящее время поддерживаются две стратегии присваивания: "диапазон" (по умолчанию) и "раундробин", который решает описанную мной проблему.

Он настраивается путем добавления следующего свойства конфигурации в конфигурацию потребителя:

'partition.assignment.strategy': 'roundrobin',

Документация по всем свойствам librdkafka доступна здесь:https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Теперь вы можете использовать кооперативную стратегию перебалансировки для повышения производительности:

      'partition.assignment.strategy': 'cooperativesticky'
Другие вопросы по тегам