Как отправить сообщение на Кафку
Я новичок в реактивном программировании и пытаюсь реализовать самый простой сценарий. Я хочу отправлять сообщение в kafka каждый раз, когда файл помещается в определенную папку. Я думаю, что плохо понимаю основы... так что, пожалуйста, не могли бы вы мне помочь?
Итак, у меня есть несколько вопросов: в чем разница между smallrye-reactive-messaging и smallrye-reactive-streams-операторами?
У меня есть такой простой код:
@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
if(Objects.isNull(currentMessage)){
//currentMessage is an instance variable which is null when I start the application
return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
}
else {
//currentMessage has been correctly set with the file information
LOGGER.info(currentMessage);
return ReactiveStreams.of(currentMessage).map(Message::of);
}
}
Когда код входит в оператор if, все в порядке, и я получил сериализацию JSON моего объекта с нулевыми значениями. Однако я не понимаю, почему, когда мой код переходит в оператор else, ничего не попадает в тему? Кажется, что инструкции.of оператора if нарушили потоки или что-то в этом роде...
Как сохранить непрерывные потоки, которые "реагируют" на новые отброшенные файлы? (или другие события, такие как HTTP-запрос GET или что-то в этом роде) ...
Если я не верну экземпляр PublisherBuilder, например, Integer, тогда моя тема kafka будет заполнена очень огромным потоком целочисленных значений. Вот почему в примерах используются некоторые интервалы при отправке сообщений...
Что мне следует использовать: CompletationStage или CompletableFuture? RxJAva2? Немного запутанно, какую библиотеку использовать (vertx, smallrye, rxjava2, microprofile, ...)
В чем разница между:
- ReactiveStreams.fromCompletionStage
- ReactiveStreams.fromProcessor
- ReactiveStreams.fromPublisher
- ReactiveStreams.fromSubscriber
Какой использовать в каком сценарии?
Большое спасибо!
1 ответ
Начнем с разницы между операторами smallrye-reactive-messaging и smallrye-reactive-streams: smallrye-reactive-streams-Operator - это то же самое, что smallrye-reactive-messaging, но, кроме того, он поддерживает распространение контекста MicroProfile. Поскольку большинство провайдеров реактивного обмена сообщениями используют Vert.x за сценой, он будет обрабатывать ваше сообщение в стиле цикла событий, что означает, что оно будет выполняться в отдельном потоке. Иногда вам нужно распространить некоторый ctx из вашего базового потока в новый поток (например, заполнение контекста CDI и Tx для выполнения некоторой логики диспетчера сущностей JPA). Вот где помощь по распространению ctx.
Для сигнатур методов. Вы можете взглянуть на официальную документацию SmallRye-reactive-streams, разделы 3,4 и 5. У каждого из них свой сценарий использования. Вам решать, какой аромат вы хотите использовать.
Когда что использовать? Если вы не работаете в реактивном контексте, вы можете использовать нижеприведенное для отправки сообщений.
@Inject @Channel("мой-канал") Emitter emitter;
Для потребления сообщений вы можете использовать подпись метода следующим образом:
@Incoming("канал-2") public CompletionStage doSomething(сообщение anEvent)
Или
@Incoming("канал-2") public void doSomething(String anEvent)
Надеюсь, это поможет.