Повторить попытку после задержки для запланированной задачи в Combine
Я хочу запланировать API в определенный интервал времени, и если этот API дает сбой, я хочу повторить попытку этого API через определенное время, и если он достигнет максимального количества повторных попыток, он должен прекратить вызов API, но если он будет успешным, он должен продолжиться в запланированное время.
Идея:
- два потока, Timer и dataTaskPublisher.
- Таймер будет управлять dataTaskPublisher.
- если dataTaskPublisher не работает:
- таймер reschdeule с некоторой задержкой()
- если количество повторных попыток достигнуто, таймер остановки (который остановит dataTaskPublisher)
Я немного потерялся, чтобы организовать два потока. Любые предложения по образцу были бы очень полезны. Извиняюсь за пробный вариант для начинающих.
var cancellables: AnyCancellable
let interval = 2
let maxRetry = 2
let delay = 5
var timer = Timer.publish(every: TimeInterval(interval), on: .main, in: .common).autoconnect()
var session = URLSession.shared.dataTaskPublisher(for: URL(string: "https://google.com")!).map(\.data).eraseToAnyPublisher()
cancellables = timer.flatMap{ _ in
return session.catch { _ -> AnyPublisher<Data, URLError> in
return Publishers.Delay(upstream: session,
interval: .seconds(delay), tolerance: 1,
scheduler: RunLoop.main).print("retrying").retry(maxRetry).eraseToAnyPublisher()
}.eraseToAnyPublisher()
}.sink(receiveCompletion: { comp in
if case let .failure(err) = comp {
print("=============================This is error \(err)")
// This doesn't trigger.
}
}, receiveValue: { data in
print("success")
})
1 ответ
Ваше решение было близко к успеху.
После добавления обработки ошибок в dataTaskPublisher с помощью tryMap мы можем использовать издатель Fail для распространения ошибки в ReceiveCompletion. А сейчас
- Таймер управляет dataTaskPublisher.
- Если dataTaskPublisher дает сбой, мы повторяем попытку с задержкой.
- Сеанс завершается с ошибкой, если количество повторных попыток превышает maxRetry.
Я добавил переменную retry_count, чтобы убедиться, что количество ожидаемых повторных попыток не превышено. Пожалуйста, найдите новую реализацию, основанную на вашем коде ниже.
import Foundation
import Combine
var cancellables: AnyCancellable
var timer = Deferred { Just(Date()) }.append(Timer.publish(every: TimeInterval(interval), on: .main, in: .common).autoconnect())
.eraseToAnyPublisher()
let server = "http://127.0.0.1:5000"
let interval = 2
let maxRetry = 2
let delay = 5
var retry_count = 0
var session = URLSession.shared.dataTaskPublisher(for: URL(string: server)!)
.tryMap { element -> Result<URLSession.DataTaskPublisher.Output, Error> in
print("retry count \(retry_count)")
if retry_count > maxRetry + 1 {
print("max retries exceeded")
}
guard let httpResponse = element.response as? HTTPURLResponse,
httpResponse.statusCode == 200
else {
retry_count += 1
throw URLError(.badServerResponse)
}
return .success(element)
}.eraseToAnyPublisher()
cancellables = timer.flatMap { _ in
return session
.catch { (err: Error) -> AnyPublisher<Result<URLSession.DataTaskPublisher.Output, Error>, Error> in
return Fail(error: err)
.delay(for: .seconds(delay), scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
}
.retry(maxRetry)
.sink(receiveCompletion: { comp in
if case let .failure(err) = comp {
print("=============================This is error \(err)")
// This now triggers.
}
}, receiveValue: { resp in
retry_count = 0
print("success")
})
Мы можем проверить ответ на следующем сервере Flask:
from flask import Flask
app = Flask(__name__)
@app.route("/")
def my_endpoint():
rnd = random.randint(0, 2)
if rnd == 0:
return "not found", 400
else:
return "success"
# Run with $ flask --app main.py run