Создание и использование курсора с Vapor 3
Это может быть банка червей, я сделаю все возможное, чтобы описать проблему. У нас долгая работа по обработке данных. Наша база данных действий добавляется в ночное время, а оставшиеся действия обрабатываются. Обработка ночных действий занимает около 15 минут. В Vapor 2 мы использовали много необработанных запросов для создания курсора PostgreSQL и проходили по нему до тех пор, пока он не стал пустым.
В настоящее время мы запускаем обработку через параметр командной строки. В будущем мы хотим, чтобы он работал как часть основного сервера, чтобы можно было проверять ход выполнения во время обработки.
func run(using context: CommandContext) throws -> Future<Void> {
let table = "\"RecRegAction\""
let cursorName = "\"action_cursor\""
let chunkSize = 10_000
return context.container.withNewConnection(to: .psql) { connection in
return PostgreSQLDatabase.transactionExecute({ connection -> Future<Int> in
return connection.simpleQuery("DECLARE \(cursorName) CURSOR FOR SELECT * FROM \(table)").map { result in
var totalResults = 0
var finished : Bool = false
while !finished {
let results = try connection.raw("FETCH \(chunkSize) FROM \(cursorName)").all(decoding: RecRegAction.self).wait()
if results.count > 0 {
totalResults += results.count
print(totalResults)
// Obviously we do our processing here
}
else {
finished = true
}
}
return totalResults
}
}, on: connection)
}.transform(to: ())
}
Теперь это не работает, потому что я вызываю wait() и получаю сообщение об ошибке "Предварительное условие не выполнено: wait() не должно вызываться при EventLoop", что достаточно справедливо. Одна из проблем, с которыми я сталкиваюсь, заключается в том, что я понятия не имею, как вы вообще выходите из основного цикла событий, чтобы запускать подобные вещи в фоновом потоке. Мне известно о BlockingIOThreadPool, но он все еще работает с тем же EventLoop и все еще вызывает ошибку. Несмотря на то, что я могу теоретизировать все более и более сложные способы достижения этого, я надеюсь, что мне не хватает элегантного решения, которое, возможно, кто-то с лучшим знанием SwiftNIO и Fluent мог бы помочь.
Изменить: чтобы быть ясно, цель этого, очевидно, не в сумме количество действий в базе данных. Цель состоит в том, чтобы использовать курсор для синхронной обработки каждого действия. Когда я читаю результаты, я обнаруживаю изменения в действиях, а затем выбрасываю их в потоки обработки. Когда все потоки заняты, я не начинаю читать с курсора снова, пока они не завершатся.
Есть много таких действий, до 45 миллионов за один проход. Агрегирование обещаний и рекурсии не казалось хорошей идеей, и когда я попробовал это, просто ради этого сервер завис.
Это трудоемкая задача, которая может выполняться в течение нескольких дней в одном потоке, поэтому я не беспокоюсь о создании новых потоков. Проблема в том, что я не могу понять, как я могу использовать функцию wait() внутри команды, так как мне нужен контейнер для создания соединения с базой данных, и единственный доступ к которому у меня есть - context.container. Вызов wait() для этого приводит к вышеуказанная ошибка.
ТИА
1 ответ
Итак, как вы знаете, проблема заключается в следующих строках:
while ... {
...
try connection.raw("...").all(decoding: RecRegAction.self).wait()
...
}
Вы хотите дождаться ряда результатов, и поэтому вы используете while
петля и .wait()
для всех промежуточных результатов. По сути, это превращает асинхронный код в синхронный код в цикле событий. Это может привести к взаимоблокировкам и наверняка остановит другие соединения, поэтому SwiftNIO пытается обнаружить это и выдать вам эту ошибку. Я не буду вдаваться в подробности, почему это блокирует другие соединения или почему это может привести к тупикам в этом ответе.
Давайте посмотрим, какие варианты у нас есть, чтобы решить эту проблему:
- как вы говорите, мы могли бы просто иметь это
.wait()
в другом потоке, который не является одним из потоков цикла событий. Для этого любой неEventLoop
поток будет делать: либоDispatchQueue
или вы могли бы использоватьBlockingIOThreadPool
(который не работает наEventLoop
) - мы могли бы переписать ваш код, чтобы быть асинхронным
Оба решения будут работать, но (1) на самом деле не рекомендуется, так как вы должны сжечь весь (ядро) поток, просто чтобы дождаться результатов. И оба Dispatch
а также BlockingIOThreadPool
у них есть конечное число потоков, которые они готовы порождать, поэтому если вы будете делать это достаточно часто, у вас могут закончиться потоки, так что это займет еще больше времени.
Итак, давайте посмотрим, как мы можем вызывать асинхронную функцию несколько раз, одновременно накапливая промежуточные результаты. И тогда, если мы накопили все промежуточные результаты, продолжайте со всеми результатами.
Чтобы упростить задачу, давайте рассмотрим функцию, очень похожую на вашу. Мы предполагаем, что эта функция предоставляется так же, как в вашем коде
/// delivers partial results (integers) and `nil` if no further elements are available
func deliverPartialResult() -> EventLoopFuture<Int?> {
...
}
то, что мы хотели бы сейчас, это новая функция
func deliverFullResult() -> EventLoopFuture<[Int]>
пожалуйста, обратите внимание, как deliverPartialResult
возвращает одно целое число каждый раз и deliverFullResult
доставляет массив целых чисел (т.е. все целые числа). Хорошо, так как мы пишем deliverFullResult
без звонка deliverPartialResult().wait()
?
Как насчет этого:
func accumulateResults(eventLoop: EventLoop,
partialResultsSoFar: [Int],
getPartial: @escaping () -> EventLoopFuture<Int?>) -> EventLoopFuture<[Int]> {
// let's run getPartial once
return getPartial().then { partialResult in
// we got a partial result, let's check what it is
if let partialResult = partialResult {
// another intermediate results, let's accumulate and call getPartial again
return accumulateResults(eventLoop: eventLoop,
partialResultsSoFar: partialResultsSoFar + [partialResult],
getPartial: getPartial)
} else {
// we've got all the partial results, yay, let's fulfill the overall future
return eventLoop.newSucceededFuture(result: partialResultsSoFar)
}
}
}
Дано accumulateResults
, осуществляя deliverFullResult
больше не сложно
func deliverFullResult() -> EventLoopFuture<[Int]> {
return accumulateResults(eventLoop: myCurrentEventLoop,
partialResultsSoFar: [],
getPartial: deliverPartialResult)
}
Но давайте посмотрим больше на то, что accumulateResults
делает:
- это вызывает
getPartial
один раз, потом, когда он перезванивает - проверяет, есть ли у нас
- частичный результат, в этом случае мы помним это вместе с другим
partialResultsSoFar
и вернитесь к (1) nil
что значитpartialResultsSoFar
это все, что мы получаем, и мы возвращаем новое успешное будущее со всем, что мы уже собрали
- частичный результат, в этом случае мы помним это вместе с другим
это уже правда. Что мы сделали здесь - это превратили синхронный цикл в асинхронную рекурсию.
Хорошо, мы рассмотрели много кода, но как это теперь относится к вашей функции?
Хотите верьте, хотите нет, но на самом деле это должно работать (не проверено):
accumulateResults(eventLoop: el, partialResultsSoFar: []) {
connection.raw("FETCH \(chunkSize) FROM \(cursorName)")
.all(decoding: RecRegAction.self)
.map { results -> Int? in
if results.count > 0 {
return results.count
} else {
return nil
}
}
}.map { allResults in
return allResults.reduce(0, +)
}
Результатом всего этого будет EventLoopFuture<Int>
который несет сумму всех промежуточных result.count
,
Конечно, мы сначала собираем все ваши цифры в массив, а затем суммируем его (allResults.reduce(0, +)
) в конце, который немного расточительный, но также не конец света. Я оставил это так, потому что это делает accumulateResults
быть пригодным для использования в других случаях, когда вы хотите накапливать частичные результаты в массиве.
Теперь последнее, настоящее accumulateResults
Функция, вероятно, будет общей для типа элемента, а также мы можем устранить partialResultsSoFar
параметр для внешней функции. Как насчет этого?
func accumulateResults<T>(eventLoop: EventLoop,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
// this is an inner function just to hide it from the outside which carries the accumulator
func accumulateResults<T>(eventLoop: EventLoop,
partialResultsSoFar: [T] /* our accumulator */,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
// let's run getPartial once
return getPartial().then { partialResult in
// we got a partial result, let's check what it is
if let partialResult = partialResult {
// another intermediate results, let's accumulate and call getPartial again
return accumulateResults(eventLoop: eventLoop,
partialResultsSoFar: partialResultsSoFar + [partialResult],
getPartial: getPartial)
} else {
// we've got all the partial results, yay, let's fulfill the overall future
return eventLoop.newSucceededFuture(result: partialResultsSoFar)
}
}
}
return accumulateResults(eventLoop: eventLoop, partialResultsSoFar: [], getPartial: getPartial)
}
РЕДАКТИРОВАТЬ: После редактирования ваш вопрос предполагает, что вы на самом деле не хотите накапливать промежуточные результаты. Поэтому я предполагаю, что вместо этого вы хотите выполнить некоторую обработку после получения каждого промежуточного результата. Если это то, что вы хотите сделать, возможно, попробуйте это:
func processPartialResults<T, V>(eventLoop: EventLoop,
process: @escaping (T) -> EventLoopFuture<V>,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
func processPartialResults<T, V>(eventLoop: EventLoop,
soFar: V?,
process: @escaping (T) -> EventLoopFuture<V>,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<V?> {
// let's run getPartial once
return getPartial().then { partialResult in
// we got a partial result, let's check what it is
if let partialResult = partialResult {
// another intermediate results, let's call the process function and move on
return process(partialResult).then { v in
return processPartialResults(eventLoop: eventLoop, soFar: v, process: process, getPartial: getPartial)
}
} else {
// we've got all the partial results, yay, let's fulfill the overall future
return eventLoop.newSucceededFuture(result: soFar)
}
}
}
return processPartialResults(eventLoop: eventLoop, soFar: nil, process: process, getPartial: getPartial)
}
Это будет (как и раньше) работать getPartial
пока не вернется nil
но вместо того, чтобы накапливать все getPartial
Результаты, это вызывает process
который получает частичный результат и может сделать некоторую дальнейшую обработку. Следующий getPartial
вызов произойдет, когда EventLoopFuture
process
возврат выполнен.
Это ближе к тому, что вы хотели бы?
Примечания: я использовал SwiftNIO EventLoopFuture
введите здесь, в Vapor вы бы просто использовать Future
вместо этого, но остальная часть кода должна быть такой же.
Вот общее решение, переписанное для NIO 2.16/Vapor 4 и как расширение EventLoop
extension EventLoop {
func accumulateResults<T>(getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
// this is an inner function just to hide it from the outside which carries the accumulator
func accumulateResults<T>(partialResultsSoFar: [T] /* our accumulator */,
getPartial: @escaping () -> EventLoopFuture<T?>) -> EventLoopFuture<[T]> {
// let's run getPartial once
return getPartial().flatMap { partialResult in
// we got a partial result, let's check what it is
if let partialResult = partialResult {
// another intermediate results, let's accumulate and call getPartial again
return accumulateResults(partialResultsSoFar: partialResultsSoFar + [partialResult],
getPartial: getPartial)
} else {
// we've got all the partial results, yay, let's fulfill the overall future
return self.makeSucceededFuture(partialResultsSoFar)
}
}
}
return accumulateResults(partialResultsSoFar: [], getPartial: getPartial)
}
}