Использование реактивной-кафки для условной обработки сообщений
Я пытался использовать реактив-кафку, и у меня возникла проблема с условной обработкой, на которую я не нашел удовлетворительного ответа.
В основном я пытаюсь использовать одну тему кафки, которая содержит огромное количество сообщений (около 10 миллиардов сообщений в день), и обрабатывать только некоторые из этих сообщений (несколько тысяч в день) на основе некоторого свойства сообщения, затем отправьте обработанную версию моего сообщения в другую тему, и я изо всех сил пытаюсь сделать это правильно.
Моя первая попытка была что-то вроде:
// This is pseudo code.
Source(ProducerSettings(...))
.filter(isProcessable(_))
.map(process(_))
.via(Producer.flow(producerSettings))
.map(_.commitScalaDsl())
.runWith(Sink.ignore)
Проблема с этим подходом заключается в том, что я фиксирую только когда читаю сообщения, которые могу обработать, что явно не круто, потому что, если мне нужно остановить и перезапустить программу, я должен перечитать кучу бесполезных сообщений, и так как их так много, я не могу себе этого позволить.
Затем я попытался использовать GraphDSL, делая что-то вроде:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> commit
~> broadcast ~> isNotProcessable ~> merge
Это решение, очевидно, также не годится, потому что сообщения, которые я не могу обработать, проходят через вторую ветвь графа и фиксируются до того, как обрабатываемые сообщения действительно будут отправлены к месту назначения, что несколько хуже, чем первое сообщение, потому что оно не даже гарантированную доставку хотя бы раз.
Кто-нибудь имеет представление о том, как я мог решить эту проблему?
1 ответ
До этого я решал похожую проблему, используя порядковый номер, чтобы гарантировать порядок.
Например, вы можете создать поток, подобный тому, который вы описываете, для сохранения коммита:
in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
~> broadcast ~> isNotProcessable ~> merge
А затем оберните его в поток сохранения порядка, подобный этому (взят из библиотеки, которую мы разработали в моей компании): OrderPreservingFlow. Полученный поток затем может быть отправлен в приемник коммиттера.
Если ваш этап обработки гарантирует упорядочение, вы даже можете быть более эффективными и избежать буферизации, внедряя логику прямо в график:
in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
~> broadcast ~> isNotProcessable ~> mergeNextSeqNr
Здесь ваш mergeNextSeqNr представляет собой просто измененную стадию слияния, на которой, если вход доступен на порту 1, вы немедленно отправляете его, если его порядковый номер является ожидаемым, в противном случае вы просто ждете, пока данные будут доступны на другом порту.
Конечный результат должен быть в точности таким же, как при использовании описанного выше потока, но вы можете легче адаптировать его к своим потребностям, если встраиваете его.