Почему мой потребитель apache kafka случайно игнорирует сообщения в очереди?
Это, вероятно, eisenbug, поэтому я не жду жестких ответов, но больше подсказок о том, что искать, чтобы иметь возможность повторить ошибку.
У меня есть управляемая событиями, основанная на Кафке система, состоящая из нескольких сервисов. На данный момент они организованы в линейные трубопроводы. Одна тема, один тип события. Каждый сервис можно рассматривать как преобразование одного типа события в один или несколько типов событий.
Каждое преобразование выполняется как процесс Python, со своим потребителем и своим производителем. Все они используют один и тот же код и конфигурацию, потому что все это абстрагировано от реализации сервиса.
Теперь в чем проблема. В нашей промежуточной среде иногда (скажем, одно из каждых пятидесяти сообщений) на Кафке есть сообщение, но потребитель его вообще не обрабатывает. Даже если вы ждете часы, он просто зависает. Это не происходит в местных условиях, и мы не смогли воспроизвести его где-либо еще.
Немного более актуальной информации:
- эти службы часто перезапускаются для целей отладки, но зависание не похоже на перезапуск.
- Когда сообщение зависает и вы перезапускаете службу, служба обработает сообщение.
- Сервисы полностью не сохраняют состояние, поэтому кеширование или другие странные вещи не происходят (надеюсь)
- Когда это происходит, у меня есть доказательства того, что они все еще не обрабатывают более старые сообщения (я регистрирую, когда они производят вывод, и это происходит прямо перед концом цикла потребителя)
- При текущем развертывании в группе потребителей только один потребитель, поэтому параллельная обработка внутри одних и тех же служб отсутствует, горизонтальное масштабирование службы отсутствует.
Как я потребляю:
Я использую пикафку и это потребительский цикл:
def create_consumer(self):
consumer = self.client.topics[bytes(self.input_topic, "UTF-8")].get_simple_consumer(
consumer_group=bytes(self.consumer_group, "UTF-8"),
auto_commit_enable=True,
offsets_commit_max_retries=self.service_config.kafka_offsets_commit_max_retries,
)
return consumer
def run(self):
consumer = self.create_consumer()
while not self.stop_event.wait(1):
message = consumer.consume()
results = self._process_message(message)
self.output_results(results)
Я предполагаю, что или есть какая-то проблема с тем, как я потребляю сообщения, или есть некоторое несовместимое состояние смещений группы потребителей, но я не могу действительно сосредоточиться на проблеме.
Я также собираюсь переехать в Фауст, чтобы решить проблему. Учитывая мою кодовую базу и мое архитектурное решение, переход не должен быть слишком сложным, но прежде чем начать такую работу, я хотел бы быть уверен, что я иду в правильном направлении. Прямо сейчас это был бы слепой выстрел, надеющийся, что некоторая деталь, которая создает проблему, уйдет.