Как отправить сообщение на Кафку

Я новичок в реактивном программировании и пытаюсь реализовать самый простой сценарий. Я хочу отправлять сообщение в kafka каждый раз, когда файл помещается в определенную папку. Я думаю, что плохо понимаю основы... так что, пожалуйста, не могли бы вы мне помочь?

Итак, у меня есть несколько вопросов: в чем разница между smallrye-reactive-messaging и smallrye-reactive-streams-операторами?

У меня есть такой простой код:

@Outgoing( "my-topic" )
public PublisherBuilder<Message<MessageWrapper>> generate() {
     if(Objects.isNull(currentMessage)){
          //currentMessage is an instance variable which is null when I start the application
          return ReactiveStreams.of(new MessageWrapper()).map(Message::of);
     }
     else {
          //currentMessage has been correctly set with the file information
          LOGGER.info(currentMessage);
          return ReactiveStreams.of(currentMessage).map(Message::of);
     }
}

Когда код входит в оператор if, все в порядке, и я получил сериализацию JSON моего объекта с нулевыми значениями. Однако я не понимаю, почему, когда мой код переходит в оператор else, ничего не попадает в тему? Кажется, что инструкции.of оператора if нарушили потоки или что-то в этом роде...

Как сохранить непрерывные потоки, которые "реагируют" на новые отброшенные файлы? (или другие события, такие как HTTP-запрос GET или что-то в этом роде) ...

Если я не верну экземпляр PublisherBuilder, например, Integer, тогда моя тема kafka будет заполнена очень огромным потоком целочисленных значений. Вот почему в примерах используются некоторые интервалы при отправке сообщений...

Что мне следует использовать: CompletationStage или CompletableFuture? RxJAva2? Немного запутанно, какую библиотеку использовать (vertx, smallrye, rxjava2, microprofile, ...)

В чем разница между:

  • ReactiveStreams.fromCompletionStage
  • ReactiveStreams.fromProcessor
  • ReactiveStreams.fromPublisher
  • ReactiveStreams.fromSubscriber

Какой использовать в каком сценарии?

Большое спасибо!

1 ответ

Начнем с разницы между операторами smallrye-reactive-messaging и smallrye-reactive-streams: smallrye-reactive-streams-Operator - это то же самое, что smallrye-reactive-messaging, но, кроме того, он поддерживает распространение контекста MicroProfile. Поскольку большинство провайдеров реактивного обмена сообщениями используют Vert.x за сценой, он будет обрабатывать ваше сообщение в стиле цикла событий, что означает, что оно будет выполняться в отдельном потоке. Иногда вам нужно распространить некоторый ctx из вашего базового потока в новый поток (например, заполнение контекста CDI и Tx для выполнения некоторой логики диспетчера сущностей JPA). Вот где помощь по распространению ctx.

Для сигнатур методов. Вы можете взглянуть на официальную документацию SmallRye-reactive-streams, разделы 3,4 и 5. У каждого из них свой сценарий использования. Вам решать, какой аромат вы хотите использовать.

Когда что использовать? Если вы не работаете в реактивном контексте, вы можете использовать нижеприведенное для отправки сообщений.

@Inject @Channel("мой-канал") Emitter emitter;

Для потребления сообщений вы можете использовать подпись метода следующим образом:

@Incoming("канал-2") public CompletionStage doSomething(сообщение anEvent)

Или

@Incoming("канал-2") public void doSomething(String anEvent)

Надеюсь, это поможет.

Другие вопросы по тегам