Как подписаться на список из нескольких шаблонов подстановочных знаков kafka, используя kafka-python?
Я подписываюсь на Kafka, используя шаблон с подстановочным знаком, как показано ниже. Подстановочный знак представляет динамический идентификатор клиента.
consumer.subscribe(pattern='customer.*.validations')
Это хорошо работает, потому что я могу вытащить идентификатор клиента из строки темы. Но теперь мне нужно расширить функциональность, чтобы прослушать похожую тему для немного другой цели. Давайте назовем это customer.*.additional-validations
, Код должен жить в одном и том же проекте, потому что используется много функциональности, но мне нужно иметь возможность выбрать другой путь в зависимости от типа очереди.
В документации Kafka я вижу, что можно подписаться на множество тем. Однако это жестко закодированные строки. Не шаблоны, которые учитывают гибкость.
>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
... assert isinstance(msg.value, dict)
Поэтому мне интересно, можно ли как-нибудь сделать комбинацию из двух? Вроде как (нерабочий):
consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])
1 ответ
В коде KafkaConsumer он поддерживает список тем или шаблон,
def subscribe(self, topics=(), pattern=None, listener=None):
"""Subscribe to a list of topics, or a topic regex pattern
Partitions will be dynamically assigned via a group coordinator.
Topic subscriptions are not incremental: this list will replace the
current assignment (if there is one).
Таким образом, вы можете создать регулярное выражение с условием ИЛИ, используя |
, который должен работать как подписка на несколько динамических регулярных выражений, так как он внутренне использует re
модуль для сопоставления.
(customer.*.validations)|(customer.*.additional-validations)
В библиотеке Confluent Kafka у подписчика нет
pattern
ключевое слово, но вместо этого будет обрабатывать шаблоны регулярных выражений, начинающиеся с
^
.
def subscribe(self, topics, on_assign=None, *args, **kwargs):
"""
Set subscription to a supplied list of topics
This replaces a previous subscription.
Regexp pattern subscriptions are supported by prefixing the topic string with ``"^"``, e.g.::
consumer.subscribe(["^my_topic.*", "^another[0-9]-?[a-z]+$", "not_a_regex"])
"""