Как узнать с помощью GPars, что все потоки закончились, когда выдается исключение?

В случае, когда поток выбрасывает исключение, как я могу ждать, пока все потоки, которые не выдавали исключение, не закончатся (чтобы пользователь не запустился снова, пока все не остановилось)?

Я использую GPars несколькими различными способами, поэтому мне нужна стратегия для каждого (параллельные коллекции, асинхронные замыкания и разветвление / соединение). Исключения не скрываются, они хорошо обрабатываются с помощью обещаний, getChildrenResults и т. Д., Так что это не проблема (благодаря ответам Вацлава Печа). Мне просто нужно убедиться, что основной поток ожидает, пока все, что еще работает, завершится или остановится иным образом.

Например, при использовании параллельных коллекций некоторые потоки продолжают работать, а некоторые никогда не запускаются после исключения. Так что нелегко сказать, сколько там ждать, или заполучить их, возможно.

Я думаю, что, возможно, есть способ работать с пулом потоков (в данном случае GParsPool). Какие-либо предложения?

Спасибо!

2 ответа

Решение

Я считаю, что у меня есть решение этой проблемы, я реализовал его в приложении после тщательного тестирования, и оно работает.

Закрытие withPool передается в созданном пуле (a jsr166y.ForkJoinPool) в качестве первого аргумента. Я могу взять это и сохранить в переменной (currentPool), чтобы позже использовать его основным потоком, например так:

    GParsPool.withPool { pool ->
        currentPool = pool

Когда выдается исключение и возвращается к основному потоку для обработки, я могу заставить его ждать, пока все не закончится, что-то вроде этого:

    } catch (Exception exc) {
        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        println 'all done'
    }

IsQuiescent(), кажется, является безопасным способом убедиться, что больше не нужно ничего делать.

Обратите внимание, что во время тестирования я также обнаружил, что исключения, похоже, не прекращают выполнение цикла, как я изначально думал. Если бы у меня был список 500 и я делал каждый Параллель, они все работали независимо от того, была ли ошибка в первом из них. Поэтому мне пришлось завершить цикл с помощью currentPool.shutdownNow() внутри обработчика исключений параллельного цикла. Смотрите также: GPars - правильный способ досрочного завершения параллельной коллекции

Вот полное упрощенное представление фактического решения:

void example() {
    jsr166y.ForkJoinPool currentPool

    AtomicInteger threadCounter = new AtomicInteger(0)
    AtomicInteger threadCounterEnd = new AtomicInteger(0)

    AtomicReference<Exception> realException = new AtomicReference<Exception>()

    try {
        GParsPool.withPool { pool ->
            currentPool = pool

            (1..500).eachParallel {
                try {
                    if (threadCounter.incrementAndGet() == 1) {
                        throw new RuntimeException('planet blew up!')
                    }

                    if (realException.get() != null) {
                        // We had an exception already in this eachParallel - quit early
                        return
                    }

                    // Do some long work
                    Integer counter=0
                    (1..1000000).each() {counter++}

                    // Flag if we went all the way through
                    threadCounterEnd.incrementAndGet()
                } catch (Exception exc) {
                    realException.compareAndSet(null, exc)

                    pool.shutdownNow()
                    throw realException
                }
            }
        }
    } catch (Exception exc) {
        // If we used pool.shutdownNow(), we need to look at the real exception.
        // This is needed because pool.shutdownNow() sometimes generates a CancellationException
        // which can cover up the real exception that caused us to do a shutdownNow().
        if (realException.get()) {
            exc = realException.get()
        }

        if (currentPool) {
            while (!currentPool.isQuiescent()) {
                Thread.sleep(100)
                println 'waiting for threads to finish'
            }
        }

        // Do further exception handling here...
        exc.printStackTrace()
    }
}

Возвращаясь к моему более раннему примеру, если я впервые выбрасывал исключение на 4-ядерном компьютере, в очереди было около 5 потоков. Функция shutdownNow () обрезала бы вещи примерно через 20 или около того потоков, так что проверка "выйти раньше" в верхней части помогла этим 20 или около того выйти как можно скорее.

Просто разместите это здесь на случай, если это поможет кому-то другому, взамен на всю помощь, которую я получил здесь. Спасибо!

Я полагаю, что вам нужно будет перехватить исключение, а затем вернуть что-то, отличное от ожидаемого результата (например, String или null, если вы ожидаете число, например), т.е.

@Grab('org.codehaus.gpars:gpars:0.12')
import static groovyx.gpars.GParsPool.*

def results = withPool {
  [1,2,3].collectParallel {
    try {
      if( it % 2 == 0 ) {
        throw new RuntimeException( '2 fails' )
      }
      else {
        Thread.sleep( 2000 )
        it
      }
    }
    catch( e ) { e.class.name }
  }
}
Другие вопросы по тегам