Обработка максимального количества задач на одного потребителя в теме за заданное временное окно?

Мой продюсер создает n задачи из одного входного сообщения и публикует их на topic.

Требование состоит в том, чтобы из всех индивидуальных потребителей в группе потребителей topic, ни один из них не должен обрабатывать более трех из этих n задачи в течение 1 часа.

Это означает, что если я хочу обработать все эти сообщения немедленно, мне нужно как минимум ceil(n/3)потребители. Если меньше чемceil(n/3) потребителей, то мне нужен способ отложить сообщение, пока не будет num_processed < 3 за последний час.

Что касается практических аспектов реализации этого решения, я надеюсь использовать Kafka с Faust [1], но при необходимости у меня также есть доступ к Redis.

Моя идея до сих пор заключалась в том, чтобы обеспечить как минимум ceil(n/3) потребителей при производстве, а затем просто использовать циклическое распределение задач topicот производителя. В любом случае это оптимальное решение, поскольку оно избавляет от необходимости ждать до 1 часа для обработки сообщений. Однако это будет работать только до тех пор, пока не умрет достаточное количество потребителей, после чего один и тот же потребитель сможет обработать более трех, скорее всего, в течение 1 часа. Это неприемлемо.

Другая идея может заключаться в том, чтобы потребители проверяли каждый раз, когда они получают сообщение, выполнили ли они 3 из n задачи уже, и если да, то каким-то образом запросите, чтобы другой потребитель работал с ним, но я не смог найти в Kafka какой-либо подходящий механизм, чтобы включить это.

[1] https://faust.readthedocs.io/

1 ответ

Что-то, что потребует немного больше усилий, но значительно упростит фактическую обработку, - это наличие предварительного потребителя, который просто ждал бы, пока не появятся 3 сообщения, которые нужно обработать, упаковать их в "мета-сообщение" и отправить что в тему "готовая к обработке". Он должен быть похож на упомянутый @cricket_007, он не должен фиксироваться до тех пор, пока на самом деле не потребит 3 сообщения и не создаст их в исходящей теме.

Таким образом, конечный потребитель будет чрезвычайно простым. Он просто потреблял бы данные из темы "готово к обработке", и каждый раз, когда он получает сообщение, вы знаете, что у него будут 3 события, которые вам нужны. Вы просто обрабатываете их и ждете еще час, пока снова не сможете опросить. Нет необходимости координировать свои действия с другими потребителями.

Другие вопросы по тегам