Сбор результатов асинхронно от параллельного исполнителя gpars

У нас есть некоторый код на Java с использованием ThreadPoolExecutor и CompletionService. Задачи представлены большими партиями в пул; результаты отправляются в службу завершения, где мы собираем выполненные задачи, когда они доступны, не дожидаясь завершения всего пакета:

 ThreadPoolExecutor _executorService =
            new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, new LinkedBlockingQueue(20));
 CompletionService _completionService =
            new ExecutorCompletionService<Callable>(_executorService)

//submit tasks
_completionService.submit( some task);

//get results
while(...){
   Future result = _completionService.poll(timeout);
   if(result)
      //process result
}

Общее количество работников в пуле составляет MAX_NUMBER_OF_WORKERS; задачи, представленные без доступного работника, ставятся в очередь; до 20 задач могут быть поставлены в очередь, после чего задачи отклоняются.

Что является аналогом Gpars для этого подхода?

Читая документацию по параллелизму gpars, я нашел много возможных вариантов: collectManyParallel(), anyParallel(), fork/joinи т.д., и я не уверен, какие из них даже проверить. Я надеялся найти какое-то упоминание о "завершении" или "обслуживании завершения" в качестве сравнения в документации, но ничего не нашел. Я ищу направление / указатели на то, с чего начать, если у вас есть опыт работы с gpars.

1 ответ

Сбор результатов на лету, сокращение производителей - это требует решения для потока данных. Пожалуйста, найдите пример запуска демо ниже:

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

int MAX_NUMBER_OF_WORKERS = 10

ThreadPoolExecutor _executorService =
        new ThreadPoolExecutor(MAX_NUMBER_OF_WORKERS, MAX_NUMBER_OF_WORKERS, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(200));

final group = new DefaultPGroup(new DefaultPool(_executorService))
final results = new DataflowQueue()

//submit tasks
30.times {value ->
    group.task(new Runnable() {
        @Override
        void run() {
            println 'Starting ' + Thread.currentThread()
            sleep 5000
            println 'Finished ' + Thread.currentThread()
            results.bind(value)
        }
    });
}
group.task {
    results << -1  //stop the consumer eventually
}

//get results
while (true) {
    def result = results.val
    println result
    if (result == -1) break
    //process result
}

group.shutdown()
Другие вопросы по тегам