Как правильно выполнять микросервисы, основанные на событиях, с кваркусом и smallrye
Уважаемые, я пытаюсь создать какие-то микросервисы, управляемые событиями. В настоящее время мне удалось получить сообщение от Kafka и обновить запись в базе данных, когда сообщение получено с помощью расширения обмена сообщениями Quarkus & Smallrye-Reactive. В дальнейшем я хочу добиться того, чтобы в случае успеха можно было отправить сообщение в другую тему, а в противном случае - в тему ошибки. Я знаю, что мы можем использовать аннотацию return и @outgoing для отправки нового сообщения, но я не думаю, что это подойдет для моего варианта использования. Здесь мне нужно руководство, если при использовании сообщения произойдет ошибка. Должен ли я вернуть сообщение в исходную тему (не подтверждая сообщение) или я должен использовать его и выдать сообщение об ошибке в другую тему для отката исходной транзакции.
Вот мой код:
@Incoming("new-payment")
public void newMessage(String msg) {
LOG.info("New payment has been received.");
LOG.info("Payload is {}", msg);
PaymentEvent pe = jsob.fromJson(msg, PaymentEvent.class);
mysqlPool.preparedQuery("select totalBuyers from Book where isbn = ? ",
Tuple.of(pe.getIsbn()))
.thenApply(rs -> {
RowIterator<Row> iterator = rs.iterator();
if(iterator.hasNext()) {
return iterator.next().getInteger(0) + 1;
} else {
return Integer.valueOf(0);
}
})
.thenApply(totalCount -> {
return mysqlPool.preparedQuery("update Book set totalBuyers = ?",
Tuple.of(totalCount));
})
.whenComplete((rs, err) -> {
if(err != null) {
//Emit an error to error topic.
} else {
//Emit a msg to other service.
}
});
}
Также, если у вас есть лучший код, отправьте, я все еще новичок в реактивном программировании:).
2 ответа
Я занимаюсь корпоративной интеграцией в течение многих лет и думаю, что вы захотите сделать и то, и другое.
Должен ли я вернуть сообщение в исходную тему (не подтверждая сообщение) или я должен использовать его и выдать сообщение об ошибке в другую тему, чтобы откатить исходную транзакцию.
Событие должно оставаться в теме, чтобы другой экземпляр мог его подобрать и обработать. И сообщение об ошибке должно быть зарегистрировано как событие. Возможно, тот же потребитель сможет успешно обработать событие и обработать его.
EDA (архитектура, управляемая событиями) может предлагать различные способы решения этой проблемы, но на ESB сообщение будет помечено как проверенное. Обычно после трех попыток он попадает в очередь недоставленных сообщений, чтобы его можно было исправить и повторно обработать позже.
Наше предприятие также начинает разрабатывать и создавать приложения с использованием EDA, поэтому мне интересно узнать, что другие говорят по этому вопросу. И СПАСИБО за то, что сосредоточились на Quarkus. Я считаю, что это одна из лучших технологий Redhat, которые я когда-либо видел!
Другая проблема с этим подходом заключается в том, что вы выполняете "2 записи в одной службе", например, один вызов базы данных, а другой - тему. И это может стать проблемой, если одна из двух записей не удалась.
Если вы хотите избежать этого и использовать подход, основанный исключительно на событиях, вам нужно переупорядочить свои события таким образом, чтобы запись в базу данных была последним событием во всем потоке, чтобы вы могли предотвратить 2 записи из 1 службы.
Таким образом, в вашем случае: измените второй метод thenApply(..) с обновления базы данных на запуск нового события в другой теме. И пользователь этой новой темы должен выполнить обновление базы данных. Таким образом, поток становится таким:
Производитель -> тема1 -> потребитель (выберите из...) и запускает событие в другую тему -> тема2 -> потребитель (таблица обновлений).