GPars - правильный способ досрочного завершения параллельной коллекции

Каков наилучший способ завершить параллельный сбор (в случае исключения, выдаваемого одним из потоков, или прерывания, инициированного пользователем)?

В любом случае, я мог бы легко установить флаг и просто проверить его в верхней части цикла. Но если у меня есть 10000 предметов в коллекции, я бы лучше сказал всем, что подает их в ForkJoinPool, чтобы прекратить их подавать. Позвольте 5 или 20, которые уже начали, заканчиваться, но не начинайте больше.

Вот пример, который вы можете подключить, чтобы понять, что я имею в виду. Если я имитирую выдачу исключения одним из потоков, создается впечатление, что коллекция перестает обрабатывать, потому что первый вывод количества потоков (в сообщении "A:") часто очень мал (5 или около того). Однако, печатая счетчики за пределами GParsPool.withPool (в "B:"), я вижу, что они действительно продолжали работать (всегда все 100):

void doIt() {
    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    try {
        GParsPool.withPool { pool ->

            try {
                (1..100).eachParallel {
                    try {
                        if (threadCounter.incrementAndGet() == 1) {
                            throw new RuntimeException('planet blew up!')
                        }

                        // Do some long work
                        Integer counter=0
                        (1..1000000).each() {counter++}

                        // Flag if we went all the way through
                        threadCounterEnd.incrementAndGet()
                    } catch (Exception exc) {
                        //pool.shutdownNow()
                        throw new RuntimeException(exc)
                    }
                }
            } catch (Exception exc) {
                println 'caught exception'
                //pool.shutdownNow()
                println "A: threads launched was ${threadCounter}, ended was ${threadCounterEnd}"
                throw new RuntimeException(exc)
            }
        }
    } catch (Exception exc) {
        exc.printStackTrace()
    }

    println "B: threads launched was ${threadCounter}, ended was ${threadCounterEnd}"
}

Если я использую pool.shutdown() внутри каждой параллели, это никак не повлияет. Если я использую pool.shutdownNow(), он завершает обработку так, как я хочу, но выдает исключение CancellationException, которое скрывает реальное исключение, которое я хотел бы получить. Я мог бы сохранить "реальное" исключение в переменной для последующего доступа, но я должен задаться вопросом, не существует ли лучшего способа сказать, что параллельная коллекция должна быть чисто остановлена.

1 ответ

Решение

Там нет такого механизма банкомата. Поскольку пул не может различить ваши задачи и задачи других потенциальных вычислений, у него нет механизма, чтобы помешать потокам читать очереди задач. Если вы знаете, что являетесь единственным пользователем пула потоков, вы можете использовать shutdownNow(), как вы предлагаете (с указанными последствиями).

Другой вариант, который, я думаю, вы также знаете, заключается в том, чтобы правильно перехватывать исключения в задачах напрямую и устанавливать общий атомарный флаг, чтобы указывать другим задачам на немедленное завершение.

Другие вопросы по тегам