Axon & CompletableFuture
Я столкнулся с проблемами при попытке использовать CompletableFuture с Axon. Например:
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
log.info("Start processing target: {}", target.toString());
return new Event();
}, threadPool);
future.thenAcceptAsync(event -> {
log.info("Send Event");
AggregateLifecycle.apply(event);
}, currentExecutor);
в thenAcceptAsync - AggregateLifecycle.apply(событие) имеет непредвиденное поведение. Некоторые из моих обработчиков @EventSourcingHandler начинают обрабатывать событие дважды. Кто-нибудь знает, как это исправить?
Я читал документы и все, что я получил, это:
В большинстве случаев DefaultUnitOfWork предоставит вам необходимую вам функциональность. Ожидается, что обработка произойдет в одном потоке.
так что, похоже, я должен каким-то образом использовать методы CurrentUnitOfWork.get/set, но все еще не могу понять Axon API.
1 ответ
Вы не должны apply()
события асинхронные. apply()
метод вызовет внутренние методы @EventSourcingHandler агрегата и запланирует событие для публикации, когда единица работы завершится (успешно). То, как Axon работает с единицей работы (которая координирует действия одного вызова обработчика сообщений), метод apply() должен вызываться в потоке, который управляет этой единицей работы.
Если вы хотите асинхронную публикацию событий, используйте шину событий, которая использует асинхронный транспорт, и используйте отслеживающие процессоры.