Добавление потока для записи подписчику Kafka
Мне нужно построить следующий график:
val graph = getFromTopic1 ~> doSomeWork ~> writeToTopic2 ~> commitOffsetForTopic1
но попытка реализовать его в Reactive Kafka заставила меня спуститься в кроличью нору. И это кажется неправильным, потому что это кажется мне довольно распространенным случаем: я хочу перемещать данные между темами Kafka, гарантируя при этом семантику доставки по крайней мере один раз.
Теперь совсем не проблема писать параллельно
val fanOut = new Broadcast(2)
val graph = getFromTopic1 ~> doSomeWork ~> fanOut ~> writeToTopic2
fanOut ~> commitOffsetForTopic1
Этот код работает, потому что writeToTopic2
может быть реализован с ReactiveKafka#publish(..)
, который возвращает Sink
, Но тогда я теряю гарантии ALOS и, следовательно, данные, когда мое приложение падает.
Так что мне действительно нужно написать поток, который пишет в тему Кафки. Я пытался использовать Flow.fromSinkAndSource(..)
с обычаем GraphStage
но столкнуться с проблемами типа для данных, проходящих через; например, что совершается в commitOffsetForTopic1
не должны быть включены в writeToTopic2
Это означает, что я должен постоянно сохранять объект-оболочку, содержащий обе части данных. Но это противоречит требованиям, которые writeToTopic2
принять ProducerMessage[K,V]
, Моя последняя попытка решить эту проблему натолкнулась на частные и конечные классы в реактивной библиотеке kafka (расширение / обертывание / замена базового SubscriptionActor).
Я действительно не хочу поддерживать вилку, чтобы это произошло. Что мне не хватает? Почему это так сложно? Пытаюсь ли я как-то построить узел патологического графа, или этот вариант использования является упущением... или есть что-то совершенно очевидное, что я как-то упустил в документах и исходном коде, которые я копал?
Текущая версия 0.10.1. Я могу добавить более подробную информацию о любой из моих многочисленных попыток по запросу.