Присоединение к юнитам в Quarkus/Mutiny/Hibernate-Reactive

У меня очень запутанная проблема со следующим quarkus/hibernate-reactive/mutiny. Я начну с описания функции, которую я реализую в Quarkus, используя hibernate-reactive и mutiny.

а. Задача состоит в том, чтобы получить запись из базы данных,

        Uni<MyRecord> getAuthenticationRecord(String id);

б. затем используйте поле refresh_token в объекте, создайте объект запроса и передайте его стороннему API, который возвращает CallableFuture.

        CompletableFuture<TokenResponse> refreshToken(final TokenRequest tokenRequest);

и, наконец, получить значения из tokenRequestи обновить запись, полученную на шаге а.

Я пробовал следующее:

      class MyApi {
  public Uni<AuthRecord> refreshToken(String owner) {

    MyRecord authRecord = getAuthenticationRecord(owner); //get the authentication record

    TokenResponse refreshToken = authRecord.onItem().transform(MyRecord::refreshToken)
    .chain(refreshToken -> {
        TokenRequest request = new TokenRequest(refreshToken); //create the request object
        return Uni.createFrom().completionStage(refreshToken(request)); //convert the CallableFuture to Uni 
    });


    //Join the unis and update the auth record
    return Uni.combine().all().unis(authRecord, refreshToken).asTuple().onItem().transform( 
      tuplle -> {
        var record = tuple.getItem1();
        var refresh = tuple.getItem2();

        record.setCode(refresh.getToken());
        return record.persistAndFlush();
      }
    );
  }
}

Использование его в тестовом примере:

      @Inject
MyApi api;

@Test 
public void test1() {
  //This produces nothing
  api.refreshToken("owner").subscribe().with(
    item -> { 
      System.out.println(Json.encode(item));
    }
  )
}

@Test 
public void test2() {
  //This won't work because no transaction is active
  var record = api.refreshToken("owner").await().indefinitely();

}

@Test 
@ReactiveTransactional
public void test3() {
  //This won't work either because the thread is blocked @Blocking annotation didn't help either 
  var record = api.refreshToken("owner").await().indefinitely();

}

Какие-либо предложения?

1 ответ

Вы можете протестировать реактивные приложения, используя quarkus-test-vertx:

  1. Добавитьquarkus-test-vertxзависимость:
            <dependency>
       <groupId>io.quarkus</groupId>
       <artifactId>quarkus-test-vertx</artifactId>
       <scope>test</scope>
    </dependency>
    
  2. Теперь у вас есть доступ кUniAsserter:
            @Inject
    MyApi api;
    
    @Test 
    @TestReactiveTransaction
    public void test1(UniAsserter asserter) {
     asserter.assertThat(
         () -> api.refreshToken("owner"),
         authRecord -> Assertions.assertThat(authRecord).isNotNull();
     ); 
    }
    

@TestReactiveTransactionзапустит тест в реактивной транзакции и откатит транзакцию в конце. Это также приведет к запуску всего теста в потоке vertx-event-loop.

Если вы не хотите откатывать транзакцию в конце, вы можете начать свою собственную транзакцию, используяPanache.withTransactionи@RunOnVertxContext:

         @Test 
   @RunOnVertxContext // Makes sure that the whole test runs in a Vert.x event loop thread
   public void test1(UniAsserter asserter) {
    asserter.assertThat(
        () -> Panache.withTransaction(() -> api.refreshToken("owner")),
        authRecord -> Assertions.assertThat(authRecord).isNotNull();
    ); 

Если я правильно понял остальную часть кода,authRecordиrefreshTokenуже прикованы друг к другу. Я не думаю, что вам нужен Combine.all:

      class MyApi {
  public Uni<AuthRecord> refreshToken(String owner) {
    return getAuthenticationRecord(owner)
              .chain(authRecord -> authRecord
                  .map(MyRecord::refreshToken)
                  .chain(refreshToken -> {
                      TokenRequest request = new TokenRequest(refreshToken); //create the request object
                      return Uni.createFrom().completionStage(refreshToken(request)); //convert the CallableFuture to Uni 
                  })
                  .chain( refreshToken -> {
                      authRecord.setCode(refresh.getToken());
                      return authRecord.persistAndFlush();
                  })
              );
  }
}

И используя ссылку на метод, это становится:

          return getAuthenticationRecord(owner)
              .chain(authRecord -> authRecord
                  .map(MyRecord::refreshToken)
                  .map(TokenRequest::new)
                  .map(this::refreshToken)
                  .chain(Uni.createFrom()::completionStage)
                  .chain(refreshToken -> {
                      authRecord.setCode(refresh.getToken());
                      return authRecord.persistAndFlush();
                  })
              );
   }
Другие вопросы по тегам