Реактивный обмен сообщениями Quarkus с kafka
У меня два микросервиса, производитель и потребитель. Производитель записывает приращение числа каждые две секунды в тему кафки. У потребителя есть два запущенных экземпляра, использующих эти приращения. Я заметил несколько странных вещей, которые хочу решить:
- Когда потребителей еще нет, а производитель начинает выдавать, сообщения сохраняются в кафке. Когда потребитель подключается к сети, он не обрабатывает уже существующие сообщения, которые уже были созданы производителем, а скорее начинает принимать сообщения, которые поступают сейчас. Как потребитель может также обработать все неиспользованные сообщения?
- Когда есть два потребителя, я ожидаю, что оба потребителя потребляют одинаково. Теперь только один из потребителей получает всю нагрузку, а другой просто сидит там. Как распределить нагрузку по количеству потребителей?
- Похоже, кафка сохраняет все создаваемые записи, даже если они уже потреблены потребителем. Есть ли способ предотвратить это? Я не могу найти хорошую информацию, например, о благодарностях.
Кто-нибудь знает ответ на один из этих трех квестов?
Потребительская конфигурация:
mp.messaging.incoming.stocks.connector=smallrye-kafka
mp.messaging.incoming.stocks.topic=stocks
mp.messaging.incoming.stocks.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.stocks.group.id=test1
mp.messaging.incoming.stocks.auto.offset.reset=earliest
Конфигурация производителя:
mp.messaging.outgoing.stock-quote.connector=smallrye-kafka
mp.messaging.outgoing.stock-quote.topic=stocks
mp.messaging.outgoing.stock-
quote.value.serializer=org.apache.kafka.common.serialization.StringSerializer
1 ответ
Kafka сохраняет все записи по умолчанию, поскольку классифицируется как механизм потоковой передачи событий (обмен сообщениями в реальном времени + хранилище). Конфигурация по умолчанию для хранения сообщений в теме составляет 7 дней (168 часов) и может быть изменена с помощью конфигурации темы.retention.ms=10000
(10 секунд или любое другое значение).
Для одновременного использования вам необходимо убедиться, что ваша тема разбита на разделы, поскольку разделение тем является единицей параллелизма в Kafka. Также, насколько я помню, команда разработчиков Quarkus/Smallrye находилась в процессе реализации слушателя ребалансировки Kafka. Если этот находится на своем месте с последней версией Quarkus, то оставшаяся часть предназначена для разбивки вашей темы.
Для использования сообщений перед назначением разделов потребителю нам понадобится Dev. ввод команды здесь, я не могу посоветовать этому.