Axon & CompletableFuture

Я столкнулся с проблемами при попытке использовать CompletableFuture с Axon. Например:

CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            log.info("Start processing target: {}", target.toString());
            return new Event();

        }, threadPool);

future.thenAcceptAsync(event -> {
            log.info("Send Event");
            AggregateLifecycle.apply(event);
}, currentExecutor);

в thenAcceptAsync - AggregateLifecycle.apply(событие) имеет непредвиденное поведение. Некоторые из моих обработчиков @EventSourcingHandler начинают обрабатывать событие дважды. Кто-нибудь знает, как это исправить?

Я читал документы и все, что я получил, это:

В большинстве случаев DefaultUnitOfWork предоставит вам необходимую вам функциональность. Ожидается, что обработка произойдет в одном потоке.

так что, похоже, я должен каким-то образом использовать методы CurrentUnitOfWork.get/set, но все еще не могу понять Axon API.

1 ответ

Решение

Вы не должны apply() события асинхронные. apply() метод вызовет внутренние методы @EventSourcingHandler агрегата и запланирует событие для публикации, когда единица работы завершится (успешно). То, как Axon работает с единицей работы (которая координирует действия одного вызова обработчика сообщений), метод apply() должен вызываться в потоке, который управляет этой единицей работы.

Если вы хотите асинхронную публикацию событий, используйте шину событий, которая использует асинхронный транспорт, и используйте отслеживающие процессоры.

Другие вопросы по тегам