Как обеспечить блокировку записи потока Redis vertx?
У меня есть два микросервиса. Один микросервис отвечает за запись в поток Redis, а другой — за чтение из потока Redis. По независящим от меня причинам мне приходится использовать клиент Redis Vertx для записи в поток. Проблема в том, что микросервис, отвечающий за запись в Redis, блокируется до момента, когда он выполняет запись в Redis. Это вызывает некоторое неожиданное поведение, поскольку, конечно, код Redis vertx является асинхронным и может произойти в любое время. Например, см. ниже.
Микросервис Б
public void doSomething(){
//stuff being done
...
writeToRedis(message);
logger.infof("finished doing stuff");
}
public void writeToRedis(String message){
final Redis client = Redis.createClient(vertx, connectionString);
try{
client.connect()
.onSuccess(conn ->
conn.send(
Request.cmd(Command.XADD).arg(stream).arg("*").arg("payload").arg(message);
)
.onSuccess(
response ->
logger.infof("Message successfully written");
conn.close();
client.close();
).onFailure(e->logger.error("something bad happened",e)))
} catch(InterruptedException | ExecutionException e){
if(Thread.interrupted()){
Thread.currentThread().interrupt();
throw new CustomException(e);
} else{
throw new CustomException(e);
}
}
}
Из вышесказанногоfinished doing stuff
Сообщение журнала может быть записано до того, как будет написано сообщение. Я не хочу, чтобы это произошло. Я знаю, что vertx не хочет, чтобы вы блокировали цикл событий, но в этом случае мне нужно заблокировать его до тех пор, пока сообщение не будет записано. Не знаю, как это обойти.
2 ответа
Нет необходимости блокировать цикл событий. Вместоvoid
, метод должен вернуть, напримерFuture<Void>
:
public Future<Void> writeToRedis(String message) {
final Redis client = Redis.createClient(vertx, connectionString);
return client.connect()
.onSuccess(conn ->
conn.send(Request.cmd(Command.XADD).arg(stream).arg("*").arg("payload").arg(message))
.onSuccess(response -> {
logger.infof ("Message successfully written");
conn.close();
client.close();})
.onFailure(e -> logger.error("something bad happened",e));
}
Затем вы можете сделатьdoSomething()
действовать по завершению этого будущего:
public void doSomething(){
//stuff being done
...
writeToRedis(message).onSuccess(v ->
logger.infof("finished doing stuff"));
}
Всё, не надо блокировать! Кстати, вам не понадобится код обработки исключений вwriteToRedis
метод.
После некоторых исследований и обращения к команде Quarkus было предложено следующее решение:
используйте вариант мятежа и используйте .await().atMost(…)
Или используйте toCompletionStage().toCompletableFuture().join()