Использование реактивной-кафки для условной обработки сообщений

Я пытался использовать реактив-кафку, и у меня возникла проблема с условной обработкой, на которую я не нашел удовлетворительного ответа.

В основном я пытаюсь использовать одну тему кафки, которая содержит огромное количество сообщений (около 10 миллиардов сообщений в день), и обрабатывать только некоторые из этих сообщений (несколько тысяч в день) на основе некоторого свойства сообщения, затем отправьте обработанную версию моего сообщения в другую тему, и я изо всех сил пытаюсь сделать это правильно.

Моя первая попытка была что-то вроде:

// This is pseudo code.
Source(ProducerSettings(...))
    .filter(isProcessable(_))
    .map(process(_))
    .via(Producer.flow(producerSettings))
    .map(_.commitScalaDsl())
    .runWith(Sink.ignore)

Проблема с этим подходом заключается в том, что я фиксирую только когда читаю сообщения, которые могу обработать, что явно не круто, потому что, если мне нужно остановить и перезапустить программу, я должен перечитать кучу бесполезных сообщений, и так как их так много, я не могу себе этого позволить.

Затем я попытался использовать GraphDSL, делая что-то вроде:

in ~> broadcast ~> isProcessable    ~> process ~> producer ~> merge ~> commit
   ~> broadcast ~>              isNotProcessable           ~> merge

Это решение, очевидно, также не годится, потому что сообщения, которые я не могу обработать, проходят через вторую ветвь графа и фиксируются до того, как обрабатываемые сообщения действительно будут отправлены к месту назначения, что несколько хуже, чем первое сообщение, потому что оно не даже гарантированную доставку хотя бы раз.

Кто-нибудь имеет представление о том, как я мог решить эту проблему?

1 ответ

Решение

До этого я решал похожую проблему, используя порядковый номер, чтобы гарантировать порядок.

Например, вы можете создать поток, подобный тому, который вы описываете, для сохранения коммита:

in ~> broadcast ~> isProcessable ~> process ~> producer ~> merge ~> out
   ~> broadcast ~>            isNotProcessable          ~> merge

А затем оберните его в поток сохранения порядка, подобный этому (взят из библиотеки, которую мы разработали в моей компании): OrderPreservingFlow. Полученный поток затем может быть отправлен в приемник коммиттера.

Если ваш этап обработки гарантирует упорядочение, вы даже можете быть более эффективными и избежать буферизации, внедряя логику прямо в график:

in ~> injectSeqNr ~> broadcast ~> isProcessable ~> process ~> producer ~> mergeNextSeqNr ~> commit
                  ~> broadcast ~>             isNotProcessable         ~> mergeNextSeqNr

Здесь ваш mergeNextSeqNr представляет собой просто измененную стадию слияния, на которой, если вход доступен на порту 1, вы немедленно отправляете его, если его порядковый номер является ожидаемым, в противном случае вы просто ждете, пока данные будут доступны на другом порту.

Конечный результат должен быть в точности таким же, как при использовании описанного выше потока, но вы можете легче адаптировать его к своим потребностям, если встраиваете его.

Другие вопросы по тегам