Повторная попытка Monix Task - почему здесь требуется Task.defer?

Недавно я обнаружил случай, который я не могу полностью понять, работая с Monix Task:

Есть две функции (в очереди обработчик сообщений):

  def handle(msg: RollbackMsg): Task[Unit] = {
    logger.info(s"Attempting to rollback transaction ${msg.lockId}")
    Task.defer(doRollback(msg)).onErrorRestart(5).foreachL { _ =>
      logger.info(s"Transaction ${msg.lockId} rolled back")
    }
  }

  private def doRollback(msg: RollbackMsg): Task[Unit] =
    (for {
      originalLock         <- findOrigLock(msg.lockId)
      existingClearanceOpt <- findExistingClearance(originalLock)
      _                    <- clearLock(originalLock, existingClearanceOpt)
    } yield ()).transact(xa)

Внутренние органы doRollbackдля понимания - это набор повторяющихся звонков ConnectionIO[_] монада, а затем transact работает на нем, превращая композицию в Monix Task,

Теперь, как видно из handle Функция Я хотел бы, чтобы весь процесс повторить 5 раз в случае сбоя. Загадочная часть в том, что этот простой вызов:

doRollback(msg).onErrorRestart(5)

действительно не перезапускает операцию при исключении (проверено в тестах). Чтобы получить такое поведение повтора, я должен явно обернуть его в Task.deferили уже в пределах Task "контекст" любым другим способом.

И это точка, которую я не полностью понимаю: почему это так? doRollback уже дает мне Task экземпляр, поэтому я должен быть в состоянии позвонить onErrorRestart на это нет? Если это не так, как я могу быть уверен, что Task Экземпляр, который я получаю из "где-то", можно перезапустить или нет?

Что мне здесь не хватает?

0 ответов

Другие вопросы по тегам