Излучение пунктов прогресса при пересылке приводит к единственной наблюдаемой

Я инициирую операцию через REST API в два этапа:

  1. Начать работу и вернуть идентификатор задачи
  2. Опросите задание с заданным идентификатором и завершите последовательность, когда операция завершится.

Опрос идентификатора задачи вернет 202, который указывает, что операция все еще выполняется, и 200, когда она завершится. Любой другой код является ошибкой.

Мне нужно сообщить подписчикам ответ каждого шага.

Ранее я хотел бы, чтобы оператор do выдвигал ответ между шагами в ReplaySubject.

startReboot()
  .do(onNext: { response in
     operationStatus.next(response)
  )
  .flatMap({ response in
     // If we could not get the task ID from the response we error
     guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError) }

     return Observable.just(taskID)
  })
  .flatMap({ taskID in
      return pollTask(withID: taskID) // internally, it uses retryWhen to check the api again with a five second delay
  })
  .do(onNext: { response in
     operationStatus.next(response)
  })
  .subscribe(onError: { _ in
     showOperationFailedIcon()
  }, onCompleted: {
     showOperationCompleteIcon()
  })

А в другом месте подписчик на тему сделает следующее:

operationStatus.subscribe(onNext: { response
    showResponse(response)
})

По сути, я показываю ход операции и ответ, который мы получаем от каждого шага одновременно.

В то время я не был знаком с Rx, чтобы придумать более чистое решение. Но теперь, когда я ознакомился с ним, мне кажется, что должно быть решение, в котором мы не используем побочные эффекты и не ограничиваем его до единой финальной наблюдаемой. Тем не менее, я не могу найти способ сделать это.

Я думал о чем-то вроде этого:

let opObs = startReboot()
let pollingObs = pollTask(/* where does the id come from? */)
Observable.concat(opObs, pollingObs)
   .subscribe(onNext : { response in
     showResponse(response)
   }, onError: { _ in
     showOperationFailedIcon()
   }, onCompleted: {
     showOperationCompleteIcon()
   })

Но это будет означать, что после выполнения операции opObs мне нужно будет сохранить идентификатор задачи в переменной за пределами монады и обернуть объекты pollingObs, чтобы получить его при запуске, снова вызывая побочные эффекты.

Есть ли оператор или комбинация операторов, которые я могу использовать, чтобы передать ответ каждого шага подписчику, а также передать его другому наблюдаемому?

1 ответ

Решение

Нечто подобное должно работать. Обратите внимание на использование share() чтобы избежать двойного запуска последовательности startReboot.

let opObs = startReboot().share()
let pollingObs = opObs.flatMap {
    guard let taskID = getTaskIDFromJSON(response) else { return Observable.error(API.serverError)

    return pollTask(withID: taskID)
}
Observable.concat(opObs, pollingObs)
   .subscribe(onNext : { response in
     showResponse(response)
   }, onError: { _ in
     showOperationFailedIcon()
   }, onCompleted: {
     showOperationCompleteIcon()
   })
Другие вопросы по тегам