Нет отката с кваркусом, бунтом и реактивным postgresql
Я пытаюсь выполнить 3 вставки в рамках одной транзакции, но не могу выполнить откат транзакции, если одна из вставок завершилась неудачно.
Я новичок в реактивном мире, и это мое самое первое реактивное приложение.
Вот упрощение модели базы данных:
EntityA 1---N EntityB
EntityA 1---N EntityC
Я хочу выполнить следующие вставки в одной транзакции:
INSERT INTO A
INSERT INTO B --(failing query)
INSERT INTO C
Но, когда вторая вставка терпит неудачу, первая вставка не откатывается.
У меня есть следующие классы:
Processor
: получает сообщение от кафки и запускает вставки через службуService
: запускает 3 вставки с использованием 3 DAOEntityADao
: запускает вставку объекта AEntityBDao
: запускает вставку объекта BEntityBDao
: запускает вставку объекта C
@ApplicationScoped
public class Processor {
private final Service service;
public Processor(final Service service) {
this.service = service;
}
@Incoming("input-channel")
@Outgoing("output-channel")
public Uni<Message<RequestMessage>> process(final Message<RequestMessage> message) {
final RequestMessage rm = message.getPayload();
return service.saveEntities(rm)
.onFailure()
.recoverWithItem(e -> {
final String errorMessage = "There was an unexpected error while saving entities";
LOG.error(errorMessage, e);
return Result.KO;
})
.flatMap(result -> {
rm.setResult(result);
return Uni.createFrom()
.item(Message.of(rm), message::ack))
});
}
}
@ApplicationScoped
public class WorkerService {
private final EntityADao entityADao;
private final EntityBDao entityBDao;
private final EntityCDao entityCDao;
public WorkerService(final EntityADao entityADao,
final EntityBDao entityBDao,
final EntityCDao entityCDao) {
this.entityADao = entityADao;
this.entityBDao = entityBDao;
this.entityCDao = entityCDao;
}
@Transactional(TxType.REQUIRED)
public Uni<Result> saveEntities(final RequestMessage requestMessage) {
return Uni.createFrom().item(Result.OK)
// Save Entity A
.flatMap(result -> {
LOG.debug("(1) Saving EntityA ...");
return entityADao.save(requestMessage.getEntityAData());
})
// Save Entity B
.flatMap(result -> {
LOG.debug("(2) Saving EntityB ...");
return entityBDao.save(requestMessage.getEntityBData());
})
// Save Entity C
.flatMap(result -> {
LOG.debug("(3) Saving EntityC ...");
return entityCDao.dao(requestMessage.getEntityCData());
})
// Return OK
.flatMap(result -> Uni.createFrom().item(Result.OK));
}
}
@ApplicationScoped
public class EntityADao {
private final PgPool client;
public EntityADao(final PgPool client) {
this.client = client;
}
@Transactional(TxType.MANDATORY)
public Uni<Result> save(final EntityAData entityAData) {
return client
.preparedQuery(
"INSERT INTO A(col1, col2, col3) " +
"VALUES ($1, $2, $3)")
.execute(Tuple.of(entityAData.col1(), entityAData.col2(), entityAData.col3()))
.flatMap(pgRowSet -> {
LOG.debug("Inserted EntityA!");
return Result.OK;
});
}
}
EntityBDao
а также EntityCDao
похожи EntityADao
.
Я уже добавил следующие зависимости в pom.xml
:
quarkus-smallrye-context-propagation
quarkus-narayana-jta
Почему, когда INSERT B
запрос в EntityBDao
не выполняется, ранее выполненный запрос (INSERT A
) не откатывается? Что мне не хватает? Что мне нужно изменить, чтобы это работало?
1 ответ
Этот абзац, недавно добавленный в нашу документацию Quarkus, должен помочь вам в этом: https://quarkus.io/guides/reactive-sql-clients.
В нем конкретно объясняется, как работать с транзакциями при использовании клиентов Reactive SQL.