Как обеспечить блокировку записи потока Redis vertx?

У меня есть два микросервиса. Один микросервис отвечает за запись в поток Redis, а другой — за чтение из потока Redis. По независящим от меня причинам мне приходится использовать клиент Redis Vertx для записи в поток. Проблема в том, что микросервис, отвечающий за запись в Redis, блокируется до момента, когда он выполняет запись в Redis. Это вызывает некоторое неожиданное поведение, поскольку, конечно, код Redis vertx является асинхронным и может произойти в любое время. Например, см. ниже.

Микросервис Б

      public void doSomething(){
   //stuff being done
   ...
   writeToRedis(message);
   logger.infof("finished doing stuff");
}

public void writeToRedis(String message){
  final Redis client = Redis.createClient(vertx, connectionString);

  try{
     client.connect()
     .onSuccess(conn -> 
       conn.send(
          Request.cmd(Command.XADD).arg(stream).arg("*").arg("payload").arg(message);
       )
       .onSuccess(
          response -> 
            logger.infof("Message successfully written");
            conn.close();
            client.close();
       ).onFailure(e->logger.error("something bad happened",e)))
  } catch(InterruptedException | ExecutionException e){
     if(Thread.interrupted()){
        Thread.currentThread().interrupt();
        throw new CustomException(e);
     } else{
        throw new CustomException(e);
     }
  }
}

Из вышесказанногоfinished doing stuffСообщение журнала может быть записано до того, как будет написано сообщение. Я не хочу, чтобы это произошло. Я знаю, что vertx не хочет, чтобы вы блокировали цикл событий, но в этом случае мне нужно заблокировать его до тех пор, пока сообщение не будет записано. Не знаю, как это обойти.

2 ответа

Нет необходимости блокировать цикл событий. Вместоvoid, метод должен вернуть, напримерFuture<Void>:

      public Future<Void> writeToRedis(String message) {
    final Redis client = Redis.createClient(vertx, connectionString);

    return client.connect()
        .onSuccess(conn -> 
            conn.send(Request.cmd(Command.XADD).arg(stream).arg("*").arg("payload").arg(message))
                .onSuccess(response -> {
                    logger.infof ("Message successfully written");
                    conn.close();
                    client.close();})
        .onFailure(e -> logger.error("something bad happened",e));
}

Затем вы можете сделатьdoSomething()действовать по завершению этого будущего:

      public void doSomething(){
    //stuff being done
    ...
    writeToRedis(message).onSuccess(v ->
        logger.infof("finished doing stuff"));
}

Всё, не надо блокировать! Кстати, вам не понадобится код обработки исключений вwriteToRedisметод.

После некоторых исследований и обращения к команде Quarkus было предложено следующее решение:

  1. используйте вариант мятежа и используйте .await().atMost(…)

  2. Или используйте toCompletionStage().toCompletableFuture().join()

Другие вопросы по тегам