Присоединение к юнитам в Quarkus/Mutiny/Hibernate-Reactive
У меня очень запутанная проблема со следующим quarkus/hibernate-reactive/mutiny. Я начну с описания функции, которую я реализую в Quarkus, используя hibernate-reactive и mutiny.
а. Задача состоит в том, чтобы получить запись из базы данных,
Uni<MyRecord> getAuthenticationRecord(String id);
б. затем используйте поле refresh_token в объекте, создайте объект запроса и передайте его стороннему API, который возвращает CallableFuture.
CompletableFuture<TokenResponse> refreshToken(final TokenRequest tokenRequest);
и, наконец, получить значения из
tokenRequest
и обновить запись, полученную на шаге а.
Я пробовал следующее:
class MyApi {
public Uni<AuthRecord> refreshToken(String owner) {
MyRecord authRecord = getAuthenticationRecord(owner); //get the authentication record
TokenResponse refreshToken = authRecord.onItem().transform(MyRecord::refreshToken)
.chain(refreshToken -> {
TokenRequest request = new TokenRequest(refreshToken); //create the request object
return Uni.createFrom().completionStage(refreshToken(request)); //convert the CallableFuture to Uni
});
//Join the unis and update the auth record
return Uni.combine().all().unis(authRecord, refreshToken).asTuple().onItem().transform(
tuplle -> {
var record = tuple.getItem1();
var refresh = tuple.getItem2();
record.setCode(refresh.getToken());
return record.persistAndFlush();
}
);
}
}
Использование его в тестовом примере:
@Inject
MyApi api;
@Test
public void test1() {
//This produces nothing
api.refreshToken("owner").subscribe().with(
item -> {
System.out.println(Json.encode(item));
}
)
}
@Test
public void test2() {
//This won't work because no transaction is active
var record = api.refreshToken("owner").await().indefinitely();
}
@Test
@ReactiveTransactional
public void test3() {
//This won't work either because the thread is blocked @Blocking annotation didn't help either
var record = api.refreshToken("owner").await().indefinitely();
}
Какие-либо предложения?
1 ответ
Вы можете протестировать реактивные приложения, используя quarkus-test-vertx:
- Добавить
quarkus-test-vertx
зависимость:<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-test-vertx</artifactId> <scope>test</scope> </dependency>
- Теперь у вас есть доступ к
UniAsserter
:@Inject MyApi api; @Test @TestReactiveTransaction public void test1(UniAsserter asserter) { asserter.assertThat( () -> api.refreshToken("owner"), authRecord -> Assertions.assertThat(authRecord).isNotNull(); ); }
@TestReactiveTransaction
запустит тест в реактивной транзакции и откатит транзакцию в конце. Это также приведет к запуску всего теста в потоке vertx-event-loop.
Если вы не хотите откатывать транзакцию в конце, вы можете начать свою собственную транзакцию, используяPanache.withTransaction
и@RunOnVertxContext
:
@Test
@RunOnVertxContext // Makes sure that the whole test runs in a Vert.x event loop thread
public void test1(UniAsserter asserter) {
asserter.assertThat(
() -> Panache.withTransaction(() -> api.refreshToken("owner")),
authRecord -> Assertions.assertThat(authRecord).isNotNull();
);
Если я правильно понял остальную часть кода,authRecord
иrefreshToken
уже прикованы друг к другу. Я не думаю, что вам нужен Combine.all:
class MyApi {
public Uni<AuthRecord> refreshToken(String owner) {
return getAuthenticationRecord(owner)
.chain(authRecord -> authRecord
.map(MyRecord::refreshToken)
.chain(refreshToken -> {
TokenRequest request = new TokenRequest(refreshToken); //create the request object
return Uni.createFrom().completionStage(refreshToken(request)); //convert the CallableFuture to Uni
})
.chain( refreshToken -> {
authRecord.setCode(refresh.getToken());
return authRecord.persistAndFlush();
})
);
}
}
И используя ссылку на метод, это становится:
return getAuthenticationRecord(owner)
.chain(authRecord -> authRecord
.map(MyRecord::refreshToken)
.map(TokenRequest::new)
.map(this::refreshToken)
.chain(Uni.createFrom()::completionStage)
.chain(refreshToken -> {
authRecord.setCode(refresh.getToken());
return authRecord.persistAndFlush();
})
);
}