Обработка gpars dataflowQueues или конвейеры, похоже, запускаются только по запросу df.val
Нужна помощь. Глядя на потоки данных / конвейеры Gpars, но что-то я не понимаю
если вы посмотрите на пример ниже (я сделал это с операторами, трубопроводами, chainWith и столкнулся с той же проблемой).
В этом примере я использовал задачи, но с таким же успехом мог бы быть и без них, и те же самые проблемы проявляются. В этом примере я настроил две DataflowQueues, одну для начальных условий и одну для результатов оценки по предикату. Затем я размечаю конвейер, который сравнивает входные данные с входными данными по предикату (это даже тест) и сохраняет результаты в очереди результатов.
настроив конвейер и разместив некоторые записи в первой очереди, я полагал, что записи будут обработаны, так как данные были доступны (это не сработало и для версии оператора), как вы можете видеть, я проверяю размер результата Q его ноль (если я удалите задачу, которая все еще выполняется) после того, как я запишу записи в sessionQ. Таким образом, запись данных не "запускает" обработку.
Первая задача - сохранить количество записей в очереди.
import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.Promise
/**
* Created by will on 13/01/2017.
*/
def iValues = [1,2,3,4,5]
DataflowQueue sessionQ = new DataflowQueue()
DataflowQueue resultQ = new DataflowQueue()
Dataflow.task {
println "setup task: set initial conditions list for rule predicate "
iValues.each {sessionQ << it}
}
Closure evenPredicate = {it %2 == 0}
//layout pipeline
sessionQ | evenPredicate | resultQ
assert resultQ.iterator().size() == 0
Promise ans = Dataflow.task {
println "result task : get three values from result q "
def outlist = []
3.times {
def res = resultQ.val
println "got result $res"
outlist << res
}
assert sessionQ.iterator().size() == 0
assert resultQ.iterator().size() == 2
outlist
}
println "ans list is $ans.val"
assert resultQ.iterator().size() == 2
это только во второй задаче / chainWith и т. д., где вы вызываете.val (или get()) во второй очереди, которую запускает движок, и ВСЕ записи обрабатываются из первой очереди и результаты связываются с resultQ.
Это видно из утверждений, так как после выполнения первых вызовов синхронизации (.val) механизм запускается и обрабатывает ВСЕ связанные записи в начальном сеансе Q.
Это проблема, так как до тех пор, пока вы не выполните этот первый вызов.val - если вы выполняете poll () или resultQ.interator.size(), например, он пустой и несвязанный, size()=0. так что вы не можете написать
for (dfRes in resultQ) {//do something with dfRes}
как всегда пусто, пока вы не потребите первый элемент из сессии. Я не понимаю почему? После того, как записи связаны с первым dataflowQueue, я думал, что элементы будут израсходованы, КАК они стали доступны (связаны) - но это не так.
теперь это сложно, так как вы не можете получить записи, проверить размер результатов, выполнить poll(), на resultQ, так как он потерпит неудачу, пока не будет прочитан первый DF из sessionQ.
В итоге мне пришлось использовать размер массива начальных значений (сообщает мне записи, сохраненные в очереди), поскольку ЕДИНСТВЕННЫЙ означает считывать одно и то же число обратно из resultQ, чтобы очистить его (в приведенном выше примере я потреблял только 3 записей из результатов Q, и утверждение показывает, что в resultQ еще осталось 2 записи (но только после того, как будет сделан первый вызов.val, если вы закомментируете эту строку, все утверждения начнут давать сбой)
Я попробовал это с Dataflow.operator, Pipeline и т. Д. И получил ту же проблему. почему работа не обрабатывается, поскольку каждый вход связан с SessionQ?
наконец, в случае с конвейером существует метод.complete(), который, если вы обрабатываете замыкание {} в конвейере, остается открытым (!complete()), но когда вы запускаете метод, подобный.binaryChoice(), он помечает Конвейер завершен и дальнейшие действия не могут быть добавлены. Почему это сделано?
Конечно, я не понимаю, что говорит это состояние (больше не будет обработки), и будет выдано исключение, если вы попытаетесь сделать следующий шаг после такого метода.
в любом случае - я попробовал трубопроводную линию, как это
Pipeline pipeLine = new Pipeline(Q)
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) }
однако, когда вы связываете значения с Q, ничего не происходит - пока вы не потребите вывод, как
odd.val
когда вдруг конвейер "запускается" и обрабатывает ВСЕ элементы DF, хранящиеся в Q.
Ничто из того, что я пробовал, не запускает планирование работы, кроме первого потребления.val
Могу ли я объяснить, почему это так, я должен пропустить момент здесь, но это "ничего не делать" до тех пор, пока не будет прочитана первая запись, это НЕ то, что я ожидал, и аннулирует любую оценку размера (.iterator.size(), poll () и т. д.) вызов типа для цели DataflowWriteChannel.
Я был бы признателен за любую помощь в этом - я боролся с этим в течение двух дней и не получил нигде. Я также посмотрел на все тесты Gpars, и они просто вызывают.val столько раз, сколько связаны входные данные - так что не показывайте проблему, которую я описал.
Вацлав Печ, или любой другой гуру Gpars, который следит за вопросами, я был бы признателен за любую подсказку, чтобы помочь мне преодолеть этот горб
С уважением заранее
1 ответ
Небольшая модификация (добавление задержки) непосредственно перед утверждением, что размер равен 0, покажет, что вычисление инициируется записанными данными:
//layout pipeline
sessionQ | evenPredicate | resultQ
sleep 5000
assert resultQ.iterator().size() == 0