Добавление потока для записи подписчику Kafka

Мне нужно построить следующий график:

val graph = getFromTopic1 ~> doSomeWork ~> writeToTopic2 ~> commitOffsetForTopic1

но попытка реализовать его в Reactive Kafka заставила меня спуститься в кроличью нору. И это кажется неправильным, потому что это кажется мне довольно распространенным случаем: я хочу перемещать данные между темами Kafka, гарантируя при этом семантику доставки по крайней мере один раз.

Теперь совсем не проблема писать параллельно

val fanOut = new Broadcast(2)
val graph = getFromTopic1 ~> doSomeWork ~> fanOut ~> writeToTopic2
                                           fanOut ~> commitOffsetForTopic1

Этот код работает, потому что writeToTopic2 может быть реализован с ReactiveKafka#publish(..), который возвращает Sink, Но тогда я теряю гарантии ALOS и, следовательно, данные, когда мое приложение падает.

Так что мне действительно нужно написать поток, который пишет в тему Кафки. Я пытался использовать Flow.fromSinkAndSource(..) с обычаем GraphStage но столкнуться с проблемами типа для данных, проходящих через; например, что совершается в commitOffsetForTopic1 не должны быть включены в writeToTopic2Это означает, что я должен постоянно сохранять объект-оболочку, содержащий обе части данных. Но это противоречит требованиям, которые writeToTopic2 принять ProducerMessage[K,V], Моя последняя попытка решить эту проблему натолкнулась на частные и конечные классы в реактивной библиотеке kafka (расширение / обертывание / замена базового SubscriptionActor).

Я действительно не хочу поддерживать вилку, чтобы это произошло. Что мне не хватает? Почему это так сложно? Пытаюсь ли я как-то построить узел патологического графа, или этот вариант использования является упущением... или есть что-то совершенно очевидное, что я как-то упустил в документах и ​​исходном коде, которые я копал?

Текущая версия 0.10.1. Я могу добавить более подробную информацию о любой из моих многочисленных попыток по запросу.

0 ответов

Другие вопросы по тегам