Чем фреймворк fork/join лучше, чем пул потоков?

Каковы преимущества использования новой структуры fork/join по сравнению с простым разделением большой задачи на N подзадач в начале, отправкой их в пул кэшированных потоков (из Executors) и ожиданием завершения каждой задачи? Я не вижу, как использование абстракции fork/join упрощает проблему или делает решение более эффективным по сравнению с тем, что было у нас в течение многих лет.

Например, алгоритм распараллеленного размытия в учебном примере может быть реализован так:

public class Blur implements Runnable {
    private int[] mSource;
    private int mStart;
    private int mLength;
    private int[] mDestination;

    private int mBlurWidth = 15; // Processing window size, should be odd.

    public ForkBlur(int[] src, int start, int length, int[] dst) {
        mSource = src;
        mStart = start;
        mLength = length;
        mDestination = dst;
    }

    public void run() {
        computeDirectly();
    }

    protected void computeDirectly() {
        // As in the example, omitted for brevity
    }
}

Разделите в начале и отправьте задачи в пул потоков:

// source image pixels are in src
// destination image pixels are in dst
// threadPool is a (cached) thread pool

int maxSize = 100000; // analogous to F-J's "sThreshold"
List<Future> futures = new ArrayList<Future>();

// Send stuff to thread pool:
for (int i = 0; i < src.length; i+= maxSize) {
    int size = Math.min(maxSize, src.length - i);
    ForkBlur task = new ForkBlur(src, i, size, dst);
    Future f = threadPool.submit(task);
    futures.add(f);
}

// Wait for all sent tasks to complete:
for (Future future : futures) {
    future.get();
}

// Done!

Задачи идут в очередь пула потоков, из которой они выполняются, когда рабочие потоки становятся доступными. Пока разделение достаточно гранулировано (чтобы избежать особого ожидания последней задачи) и пул потоков имеет достаточно (по крайней мере, N процессоров) потоков, все процессоры работают на полной скорости, пока не будут выполнены все вычисления.

Я что-то пропустил? Какова дополнительная ценность использования структуры fork/join?

10 ответов

Решение

Я думаю, что основное недоразумение состоит в том, что примеры Fork/Join НЕ показывают кражу работы, а лишь своего рода стандартную игру "разделяй и властвуй".

Воровство работы было бы так: работник Б закончил свою работу. Он добрый, поэтому он оглядывается и видит, что работник А все еще усердно работает. Он подходит и спрашивает: "Эй, парень, я могу помочь тебе". Ответы. "Круто, у меня есть задание в 1000 единиц. Пока я закончил 345, оставив 655. Не могли бы вы поработать с номерами 673–1000, я выполню 346–672". Б говорит: "Хорошо, давайте начнем, чтобы мы могли пойти в паб раньше".

Видите ли, рабочие должны общаться друг с другом, даже когда они начали настоящую работу. Это недостающая часть в примерах.

Примеры, с другой стороны, показывают только что-то вроде "использования субподрядчиков":

Работник А: "Черт, у меня есть 1000 единиц работы. Слишком много для меня. Я сам сделаю 500 и передам 500 другим подрядчикам". Это продолжается до тех пор, пока большая задача не будет разбита на маленькие пакеты по 10 единиц в каждом. Они будут выполнены доступными работниками. Но если один пакет представляет собой отравленную таблетку и занимает значительно больше времени, чем другие пакеты - не повезло, фаза разделения закончена.

Единственное оставшееся различие между Fork/Join и предварительным разделением задачи заключается в следующем: при предварительном разделении рабочая очередь заполняется с самого начала. Пример: 1000 единиц, порог 10, поэтому в очереди 100 записей. Эти пакеты распределяются между членами пула потоков.

Fork/Join более сложен и старается уменьшить количество пакетов в очереди:

  • Шаг 1: Поместите один пакет, содержащий (1...1000) в очередь
  • Шаг 2: Один рабочий извлекает пакет (1...1000) и заменяет его двумя пакетами: (1...500) и (501...1000).
  • Шаг 3: Один рабочий выталкивает пакет (500...1000) и нажимает (500...750) и (751...1000).
  • Шаг n: стек содержит эти пакеты: (1..500), (500...750), (750...875)... (991..1000)
  • Шаг n+1: пакет (991..1000) извлечен и выполнен
  • Шаг n+2: Пакет (981..990) извлечен и выполнен
  • Шаг n+3: Пакет (961..980) всплывает и разделяется на (961...970) и (971..980). ....

Вы видите: в Fork/Join очередь меньше (в примере 6), а фазы "split" и "work" чередуются.

Когда несколько рабочих одновременно появляются и толкают друг друга, взаимодействия, конечно, не так очевидны.

Если у вас n занятых потоков, все они работают на 100% независимо, это будет лучше, чем n потоков в пуле Fork-Join (FJ). Но так никогда не получается.

Возможно, не удастся точно разбить проблему на n равных частей. Даже если вы это сделаете, планирование потоков может быть справедливым. В конечном итоге вы ожидаете самую медленную ветку. Если у вас есть несколько задач, то каждый из них может выполняться с параллелизмом менее чем n (как правило, более эффективным), но все же перейти к n-way, когда другие задачи завершены.

Так почему бы нам просто не разбить проблему на куски размера FJ и не поработать над этим с пулом потоков. Типичное использование FJ разбивает проблему на мелкие кусочки. Выполнение этого в случайном порядке требует большой координации на аппаратном уровне. Накладные расходы были бы убийцей. В FJ задачи помещаются в очередь, которую поток считывает в порядке "последний пришел - первый вышел" (LIFO/ стек), а кража работы (как правило, в основной работе) выполняется "первым пришел - первым вышел" (FIFO/"очередь"). В результате обработка длинного массива может выполняться в значительной степени последовательно, даже если она разбита на крошечные куски. (Это также тот случай, когда может быть нетривиальным разбить проблему на небольшие куски одинакового размера за один большой взрыв. Скажем, имея дело с некоторой формой иерархии без балансировки.)

Вывод: FJ позволяет более эффективно использовать аппаратные потоки в неравных ситуациях, что будет всегда, если у вас более одного потока.

Конечная цель пулов потоков и Fork/Join одинакова: оба хотят максимально использовать доступную мощность ЦП для максимальной пропускной способности. Максимальная пропускная способность означает, что как можно больше задач должно быть выполнено за длительный период времени. Что для этого нужно? (В дальнейшем мы будем предполагать, что нет недостатка в вычислительных задачах: всегда достаточно сделать для 100% загрузки ЦП. Дополнительно я использую "ЦП" для ядер или виртуальных ядер в случае гиперпоточности).

  1. По крайней мере, должно быть столько потоков, сколько есть доступных процессоров, потому что запуск меньшего количества потоков приведет к тому, что ядро ​​не будет использоваться.
  2. Максимально должно быть запущено столько потоков, сколько имеется доступных процессоров, потому что запуск большего количества потоков создаст дополнительную нагрузку для планировщика, который назначает процессоры различным потокам, что приводит к тому, что некоторое время процессора идет на планировщик, а не на нашу вычислительную задачу.

Таким образом, мы выяснили, что для максимальной пропускной способности нам нужно иметь одинаковое количество потоков, чем у процессоров. В примере размытия Oracle вы можете взять пул потоков фиксированного размера с количеством потоков, равным количеству доступных процессоров, или использовать пул потоков. Это не имеет значения, вы правы!

Так когда же у вас возникнут проблемы с пулами потоков? Это если поток блокируется, потому что ваш поток ожидает завершения другой задачи. Предположим следующий пример:

class AbcAlgorithm implements Runnable {
    public void run() {
        Future<StepAResult> aFuture = threadPool.submit(new ATask());
        StepBResult bResult = stepB();
        StepAResult aResult = aFuture.get();
        stepC(aResult, bResult);
    }
}

Здесь мы видим алгоритм, который состоит из трех этапов A, B и C. A и B могут выполняться независимо друг от друга, но для этапа C требуется результат этапа A и B. То, что делает этот алгоритм, это передать задачу A пул потоков и выполнить задачу б напрямую. После этого поток будет ожидать выполнения задачи A и продолжит выполнение шага C. Если A и B завершены одновременно, то все в порядке. Но что, если A займет больше времени, чем B? Это может быть связано с тем, что природа задачи A диктует это, но это также может быть связано с тем, что в начале нет задачи для задачи A, и задача A должна ждать. (Если доступен только один процессор, и, следовательно, в вашем пуле потоков есть только один поток, это даже вызовет взаимоблокировку, но на данный момент это не имеет значения). Дело в том, что поток, который только что выполнил задачу B, блокирует весь поток. Поскольку у нас столько же потоков, сколько у процессоров, и один поток заблокирован, это означает, что один процессор простаивает.

Fork/Join решает эту проблему: в рамках fork/join вы бы написали тот же алгоритм, как показано ниже:

class AbcAlgorithm implements Runnable {
    public void run() {
        ATask aTask = new ATask());
        aTask.fork();
        StepBResult bResult = stepB();
        StepAResult aResult = aTask.join();
        stepC(aResult, bResult);
    }
}

Выглядит так же, не так ли? Однако подсказка в том, что aTask.join не будет блокировать. Вместо этого вот где воровство работы вступает в игру: поток будет искать другие задачи, которые были разветвлены в прошлом, и продолжит выполнение тех. Сначала он проверяет, начали ли обрабатываться задачи, которые он разветвлял. Поэтому, если A еще не был запущен другим потоком, он будет делать A следующим, в противном случае он проверит очередь других потоков и похитит их работу. Как только эта другая задача другого потока будет завершена, она проверит, завершена ли сейчас A. Если это выше, алгоритм может вызвать stepC, В противном случае он будет искать еще одну задачу, чтобы украсть. Таким образом, пулы разветвления / объединения могут достигать 100% загрузки ЦП даже в условиях блокирующих действий.

Однако есть ловушка: кража труда возможна только для join зов ForkJoinTask s. Это невозможно сделать для внешних блокирующих действий, таких как ожидание другого потока или ожидание действия ввода-вывода. Так что же, ожидание завершения ввода-вывода - обычная задача? В этом случае, если бы мы могли добавить дополнительный поток в пул Fork/Join, который будет снова остановлен, как только будет выполнено блокирующее действие, это будет вторым лучшим решением. И ForkJoinPool может на самом деле сделать это, если мы используем ManagedBlocker s.

Фибоначчи

В JavaDoc для RecursiveTask есть пример для вычисления чисел Фибоначчи с использованием Fork/Join. Для классического рекурсивного решения смотрите:

public static int fib(int n) {
    if (n <= 1) {
        return n;
    }
    return fib(n - 1) + fib(n - 2);
}

Как объясняется в JavaDocs, это довольно простой способ вычисления чисел Фибоначчи, так как этот алгоритм имеет сложность O(2^n), хотя возможны и более простые способы. Однако этот алгоритм очень прост и легок для понимания, поэтому мы придерживаемся его. Давайте предположим, что мы хотим ускорить это с помощью Fork/Join. Наивная реализация будет выглядеть так:

class Fibonacci extends RecursiveTask<Long> {
    private final long n;

    Fibonacci(long n) {
        this.n = n;
    }

    public Long compute() {
        if (n <= 1) {
            return n;
        }
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        return f2.compute() + f1.join();
   }
}

Шаги, на которые эта Задача разбита, слишком короткие, и, следовательно, это будет работать ужасно, но вы можете видеть, как каркас в целом работает очень хорошо: два слагаемых могут быть вычислены независимо, но тогда нам понадобятся оба из них для построения окончательного результат. Так что одна половина делается в другой теме. Получайте удовольствие, делая то же самое с пулами потоков без тупиков (возможно, но не так просто).

Просто для полноты: если вы действительно хотите рассчитать числа Фибоначчи, используя этот рекурсивный подход, вот оптимизированная версия:

class FibonacciBigSubtasks extends RecursiveTask<Long> {
    private final long n;

    FibonacciBigSubtasks(long n) {
        this.n = n;
    }

    public Long compute() {
        return fib(n);
    }

    private long fib(long n) {
        if (n <= 1) {
            return 1;
        }
        if (n > 10 && getSurplusQueuedTaskCount() < 2) {
            final FibonacciBigSubtasks f1 = new FibonacciBigSubtasks(n - 1);
            final FibonacciBigSubtasks f2 = new FibonacciBigSubtasks(n - 2);
            f1.fork();
            return f2.compute() + f1.join();
        } else {
            return fib(n - 1) + fib(n - 2);
        }
    }
}

Это значительно уменьшает подзадачи, потому что они разделяются только тогда, когда n > 10 && getSurplusQueuedTaskCount() < 2 Значение true, что означает, что нужно выполнить более 100 вызовов методов (n > 10) и там не очень мужские задачи уже ждут (getSurplusQueuedTaskCount() < 2).

На моем компьютере (4 ядра (8 при подсчете Hyper-Threading), процессор Intel® Core iM7-2720QM @ 2,20 ГГц) fib(50) занимает 64 секунды с классическим подходом и всего 18 секунд с подходом Fork/Join, что является довольно заметным выигрышем, хотя и не настолько, насколько теоретически возможно.

Резюме

  • Да, в вашем примере Fork/Join не имеет преимущества перед классическими пулами потоков.
  • Fork/Join может значительно улучшить производительность при блокировке
  • Fork/Join позволяет обойти некоторые тупиковые проблемы

Fork/join отличается от пула потоков, потому что он реализует кражу работы. От вилки / Регистрация

Как и в любом ExecutorService, инфраструктура fork/join распределяет задачи среди рабочих потоков в пуле потоков. Структура fork/join отличается тем, что использует алгоритм кражи работы. Рабочие потоки, у которых заканчивается работа, могут украсть задачи из других потоков, которые все еще заняты.

Допустим, у вас есть два потока и 4 задания a, b, c, d, которые занимают 1, 1, 5 и 6 секунд соответственно. Первоначально a и b назначаются потоку 1, а c и d - потоку 2. В пуле потоков это займет 11 секунд. С помощью fork/join поток 1 завершает работу и может украсть работу из потока 2, поэтому задача d в ​​конечном итоге будет выполнена потоком 1. Поток 1 выполняет a, b и d, поток 2 просто c. Общее время: 8 секунд, а не 11.

РЕДАКТИРОВАТЬ: Как отмечает Joonas, задачи не обязательно предварительно выделены для потока. Идея fork/join заключается в том, что поток может выбрать разделение задачи на несколько частей. Итак, чтобы повторить вышеизложенное:

У нас есть две задачи (ab) и (cd), которые занимают 2 и 11 секунд соответственно. Поток 1 начинает выполнять ab и разделяет его на две подзадачи a & b. Аналогично с потоком 2 он разделяется на две подзадачи c & d. Когда поток 1 закончил a & b, он может украсть d из потока 2.

В этом примере Fork/Join не добавляет значения, поскольку разветвление не требуется, и рабочая нагрузка равномерно распределяется по рабочим потокам. Fork/Join только добавляет накладные расходы.

Вот хорошая статья на эту тему. Цитата:

В целом, можно сказать, что ThreadPoolExecutor предпочтительнее, когда рабочая нагрузка равномерно распределяется между рабочими потоками. Чтобы гарантировать это, вам нужно точно знать, как выглядят входные данные. Напротив, ForkJoinPool обеспечивает хорошую производительность независимо от входных данных и, таким образом, является значительно более надежным решением.

Все, кто выше, правы, выгоды от кражи труда достигаются, но если говорить о том, почему это так.

Основным преимуществом является эффективная координация между рабочими потоками. Работу нужно разделить и собрать, что требует координации. Как видно из ответа AH выше, у каждого потока есть свой рабочий список. Важным свойством этого списка является его сортировка (большие задачи вверху и маленькие задачи внизу). Каждый поток выполняет задачи внизу своего списка и крадет задачи сверху списка других потоков.

Результат этого:

  • Голова и хвост списков задач могут синхронизироваться независимо, что уменьшает конкуренцию в списке.
  • Значительные поддеревья работы разделяются и повторно собираются одним и тем же потоком, поэтому для этих поддеревьев не требуется координация между потоками.
  • Когда поток крадет работу, он берет большой кусок, который затем подразделяет на собственный список
  • Стальная обработка означает, что резьба почти полностью используется до конца процесса.

Большинство других схем "разделяй и властвуй", использующих пулы потоков, требуют большего взаимодействия и координации между потоками.

Кажется, еще одно важное отличие состоит в том, что с помощью FJ вы можете выполнять несколько сложных этапов "Соединение". Рассмотрим сортировку слиянием по http://faculty.ycp.edu/~dhovemey/spring2011/cs365/lecture/lecture18.html Для предварительного разделения этой работы потребуется слишком много оркестровки. Например, вам нужно сделать следующее:

  • сортировать первую четверть
  • сортировать второй квартал
  • объединить первые 2 квартала
  • сортировать третий квартал
  • сортировать четвертую четверть
  • объединить последние 2 квартала
  • объединить 2 половинки

Как указать, что вы должны выполнять сортировку до слияния, которое их касается и т. Д.

Я искал, как лучше сделать определенную вещь для каждого из списка предметов. Я думаю, что я просто предварительно разделю список и буду использовать стандартный ThreadPool. FJ кажется наиболее полезным, когда работа не может быть предварительно разделена на достаточно независимые задачи, но может быть рекурсивно разделена на задачи, которые не зависят друг от друга (например, сортировка половин независима, но объединение 2 отсортированных половин в отсортированное целое не является).

F/J также имеет явное преимущество, когда у вас есть дорогие операции слияния. Поскольку он разбивается на древовидную структуру, вы выполняете только слияния log2(n), а не n с линейным разделением потоков. (Это делает теоретическое предположение, что у вас столько же процессоров, сколько потоков, но все же преимущество). Для выполнения домашнего задания нам пришлось объединить несколько тысяч двумерных массивов (все одинаковые измерения) путем суммирования значений в каждом индексе. С процессорами fork join и P время приближается к log2(n), а P приближается к бесконечности.

1 2 3.. 7 3 1.... 8 5 4
4 5 6 + 2 4 3 => 6 9 9
7 8 9.. 1 1 0.... 8 9 9

Я хотел бы добавить короткий ответ для тех, у кого мало времени читать длинные ответы. Сравнение взято из книги Applied Akka Patterns:

Ваше решение относительно того, следует ли использовать исполнителя fork-join или исполнителя пула потоков, в значительной степени зависит от того, будут ли операции в этом диспетчере блокироваться. Исполнитель fork-join дает вам максимальное количество активных потоков, тогда как исполнитель пула потоков дает вам фиксированное количество потоков. Если потоки заблокированы, исполнитель fork-join создаст больше, а исполнитель пула потоков - нет. Для блокирующих операций вам обычно лучше использовать пул-исполнитель потоков, поскольку он предотвращает увеличение количества потоков. Более "реактивные" операции лучше выполнять в fork-join-executeor.

Если проблема такова, что нам нужно дождаться завершения других потоков (как в случае сортировки массива или суммы массива), следует использовать fork join, поскольку Executor(Executors.newFixedThreadPool(2)) будет подавлен из-за ограниченного количество потоков. В этом случае пул forkjoin создаст больше потоков, чтобы прикрыть заблокированный поток для сохранения того же параллелизма.

Источник: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

Проблема с исполнителями для реализации алгоритмов "разделяй и властвуй" не связана с созданием подзадач, потому что вызываемый объект может свободно отправлять новую подзадачу своему исполнителю и ожидать ее результата синхронно или асинхронно. Проблема заключается в параллелизме: когда вызываемый объект ожидает результата другого вызываемого объекта, он переводится в состояние ожидания, тратя впустую возможность обработки другого вызываемого объекта, поставленного в очередь на выполнение.

Фреймворк fork/join, добавленный в пакет java.util.concurrent в Java SE 7 благодаря усилиям Дуга Ли, восполняет этот пробел

Источник: https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinPool.html

Пул пытается поддерживать достаточно активных (или доступных) потоков путем динамического добавления, приостановки или возобновления внутренних рабочих потоков, даже если некоторые задачи останавливаются в ожидании присоединения к другим. Однако такие настройки не гарантируются при заблокированном вводе-выводе или другой неуправляемой синхронизации.

public int getPoolSize() Возвращает количество рабочих потоков, которые были запущены, но еще не завершены. Результат, возвращаемый этим методом, может отличаться от getParallelism(), когда потоки создаются для поддержания параллелизма, когда другие блокируются совместно.

Вы будете поражены производительностью ForkJoin в таких приложениях, как crawler. Вот лучший урок, из которого вы могли бы поучиться.

Логика Fork/Join очень проста: (1) разделить (разветвлять) каждую большую задачу на более мелкие; (2) обрабатывать каждую задачу в отдельном потоке (при необходимости разделяя их на еще более мелкие задачи); (3) присоединиться к результатам.

Другие вопросы по тегам