Сбор результатов асинхронно от параллельного исполнителя gpars
У нас есть некоторый код на Java с использованием ThreadPoolExecutor и CompletionService. Задачи представлены большими партиями в пул; результаты отправляются в службу завершения, где мы собираем выполненные задачи, когда они доступны, не дожидаясь завершения всего пакета:
ThreadPoolExecutor _executorService =
new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, new LinkedBlockingQueue(20));
CompletionService _completionService =
new ExecutorCompletionService<Callable>(_executorService)
//submit tasks
_completionService.submit( some task);
//get results
while(...){
Future result = _completionService.poll(timeout);
if(result)
//process result
}
Общее количество работников в пуле составляет MAX_NUMBER_OF_WORKERS; задачи, представленные без доступного работника, ставятся в очередь; до 20 задач могут быть поставлены в очередь, после чего задачи отклоняются.
Что является аналогом Gpars для этого подхода?
Читая документацию по параллелизму gpars, я нашел много возможных вариантов: collectManyParallel()
, anyParallel()
, fork/join
и т.д., и я не уверен, какие из них даже проверить. Я надеялся найти какое-то упоминание о "завершении" или "обслуживании завершения" в качестве сравнения в документации, но ничего не нашел. Я ищу направление / указатели на то, с чего начать, если у вас есть опыт работы с gpars.
1 ответ
Сбор результатов на лету, сокращение производителей - это требует решения для потока данных. Пожалуйста, найдите пример запуска демо ниже:
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
int MAX_NUMBER_OF_WORKERS = 10
ThreadPoolExecutor _executorService =
new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, MAX_NUMBER_OF_WORKERS, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(200));
final group = new DefaultPGroup(new DefaultPool(_executorService))
final results = new DataflowQueue()
//submit tasks
30.times {value ->
group.task(new Runnable() {
@Override
void run() {
println 'Starting ' + Thread.currentThread()
sleep 5000
println 'Finished ' + Thread.currentThread()
results.bind(value)
}
});
}
group.task {
results << -1 //stop the consumer eventually
}
//get results
while (true) {
def result = results.val
println result
if (result == -1) break
//process result
}
group.shutdown()