Как подписаться на список из нескольких шаблонов подстановочных знаков kafka, используя kafka-python?

Я подписываюсь на Kafka, используя шаблон с подстановочным знаком, как показано ниже. Подстановочный знак представляет динамический идентификатор клиента.

consumer.subscribe(pattern='customer.*.validations')

Это хорошо работает, потому что я могу вытащить идентификатор клиента из строки темы. Но теперь мне нужно расширить функциональность, чтобы прослушать похожую тему для немного другой цели. Давайте назовем это customer.*.additional-validations, Код должен жить в одном и том же проекте, потому что используется много функциональности, но мне нужно иметь возможность выбрать другой путь в зависимости от типа очереди.

В документации Kafka я вижу, что можно подписаться на множество тем. Однако это жестко закодированные строки. Не шаблоны, которые учитывают гибкость.

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

Поэтому мне интересно, можно ли как-нибудь сделать комбинацию из двух? Вроде как (нерабочий):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

1 ответ

Решение

В коде KafkaConsumer он поддерживает список тем или шаблон,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

Таким образом, вы можете создать регулярное выражение с условием ИЛИ, используя |, который должен работать как подписка на несколько динамических регулярных выражений, так как он внутренне использует re модуль для сопоставления.

(customer.*.validations)|(customer.*.additional-validations)

В библиотеке Confluent Kafka у подписчика нет pattern ключевое слово, но вместо этого будет обрабатывать шаблоны регулярных выражений, начинающиеся с ^.

def subscribe(self, topics, on_assign=None, *args, **kwargs):
    """
    Set subscription to a supplied list of topics
    This replaces a previous subscription.
        
    Regexp pattern subscriptions are supported by prefixing the topic string with ``"^"``, e.g.::
        
        consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
    """
Другие вопросы по тегам