Пользовательский пул потоков в параллельном потоке Java 8

Можно ли указать пользовательский пул потоков для параллельного потока Java 8? Я не могу найти это нигде.

Представьте, что у меня есть серверное приложение, и я хотел бы использовать параллельные потоки. Но приложение большое и многопоточное, поэтому я хочу разделить его. Я не хочу медленное выполнение задачи в одном модуле задач блока приложения из другого модуля.

Если я не могу использовать разные пулы потоков для разных модулей, это означает, что я не могу безопасно использовать параллельные потоки в большинстве реальных ситуаций.

Попробуйте следующий пример. Есть несколько ресурсоемких задач, выполняемых в отдельных потоках. Задачи используют параллельные потоки. Первая задача не выполняется, поэтому каждый шаг занимает 1 секунду (имитируется спящий поток). Проблема в том, что другие потоки застревают и ждут, пока не завершится неработающая задача. Это надуманный пример, но представьте себе приложение сервлета и того, кто отправляет долгосрочную задачу в общий пул разветвлений.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

17 ответов

Решение

На самом деле есть хитрость, как выполнить параллельную операцию в конкретном пуле разветвления. Если вы выполняете его как задачу в пуле разветвления, он остается там и не использует общий.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

Уловка основана на ForkJoinTask.fork, который указывает: "Обеспечивает асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если это применимо, или с использованием ForkJoinPool.commonPool(), если не inForkJoinPool()"

Параллельные потоки используют значение по умолчанию ForkJoinPool.commonPool который по умолчанию имеет на один поток меньше, чем у вас процессоры, как возвращается Runtime.getRuntime().availableProcessors() (Это означает, что параллельные потоки используют все ваши процессоры, потому что они также используют основной поток):

Для приложений, которым требуются отдельные или пользовательские пулы, ForkJoinPool может быть создан с заданным целевым уровнем параллелизма; по умолчанию равно количеству доступных процессоров.

Это также означает, что если у вас есть вложенные параллельные потоки или несколько параллельных потоков, запущенных одновременно, все они будут использовать один и тот же пул. Преимущество: вы никогда не будете использовать больше, чем по умолчанию (количество доступных процессоров). Недостаток: вы можете не получить "все процессоры", назначенные каждому параллельному потоку, который вы инициируете (если у вас их больше одного). (Очевидно, вы можете использовать ManagedBlocker, чтобы обойти это.)

Чтобы изменить способ выполнения параллельных потоков, вы можете либо

  • отправьте выполнение параллельного потока на свой собственный ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); или же
  • Вы можете изменить размер общего пула, используя системные свойства: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") для целевого параллелизма 20 потоков.

Пример последнего на моей машине, которая имеет 8 процессоров. Если я запускаю следующую программу:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

Выход:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Таким образом, вы можете видеть, что параллельный поток обрабатывает 8 элементов одновременно, то есть использует 8 потоков. Однако, если я раскомментирую закомментированную строку, результат будет:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

На этот раз параллельный поток использовал 20 потоков, и все 20 элементов в потоке были обработаны одновременно.

В качестве альтернативы хитрости запуска параллельных вычислений внутри вашего собственного forkJoinPool, вы также можете передать этот пул в метод CompletableFuture.supplyAsync, например:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

Использование ForkJoinPool и отправки для параллельного потока не позволяет надежно использовать все потоки. Если вы посмотрите на это ( параллельный поток из HashSet не работает параллельно) и на это ( почему параллельный поток не использует все потоки ForkJoinPool?), Вы увидите причину.

Краткая версия: если ForkJoinPool/submit не работает для вас, используйте

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

Мы можем изменить параллелизм по умолчанию, используя следующее свойство:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

который может быть настроен на использование большего параллелизма.

Чтобы измерить фактическое количество используемых потоков, вы можете проверить Thread.activeCount():

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

Это может привести к 4-ядерному процессору, например:

5 // common pool
23 // custom pool

Без .parallel() это дает:

3 // common pool
4 // custom pool

До сих пор я использовал решения, описанные в ответах на этот вопрос. Для этого я разработал небольшую библиотеку под названием " Поддержка параллельного потока":

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

Но, как отметил @PabloMatiasGomez в комментариях, существуют недостатки в отношении механизма разделения параллельных потоков, который сильно зависит от размера общего пула. См. Параллельный поток из HashSet не работает параллельно.

Я использую это решение только для того, чтобы иметь отдельные пулы для разных типов работы, но я не могу установить размер общего пула равным 1, даже если я его не использую.

Примечание. Похоже, в JDK 10 реализовано исправление, обеспечивающее использование ожидаемого количества потоков в пуле пользовательских потоков.

Параллельное выполнение потока в пользовательском ForkJoinPool должно подчиняться параллелизму https://bugs.openjdk.java.net/browse/JDK-8190974

Если вы не хотите полагаться на хаки реализации, всегда есть способ добиться того же самого путем реализации пользовательских коллекторов, которые будут объединять map а также collect семантика... и вы не будете ограничены ForkJoinPool:

list.stream()
  .collect(parallelToList(i -> fetchFromDb(i), executor))
  .join()

К счастью, это уже сделано и доступно на Maven Central: http://github.com/pivovarit/parallel-collectors

Отказ от ответственности: я написал это и беру на себя ответственность за это.

Принятый (в настоящее время) ответ частично неверен. Недостаточно просто _submit()параллельный поток в выделенный пул fork-join. В этом случае поток будет использовать потоки этого пула и, кроме того , общий пул fork-join и даже вызывающий поток для обработки рабочей нагрузки потока, похоже, до размера общего пула fork-join. Поведение немного странное, но определенно не то, что требуется.

Чтобы на самом деле полностью ограничить работу выделенным пулом, вы должны инкапсулировать его вCompletableFuture:

      final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
  forkJoinPool = new ForkJoinPool(parallelism);
  final List<Integer> primes = CompletableFuture.supplyAsync(() -> 
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList()),
    forkJoinPool)  // <- passes dedicated fork-join pool as executor
    .join();  // <- Wait for result from forkJoinPool
    System.out.println(primes);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

Этот код остается со всеми операциями вforkJoinPoolкак на Java 8u352, так и на Java 17.0.1.

Иди, чтобы получить AbacusUtil. Номер потока может быть указан для параллельного потока. Вот пример кода:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Раскрытие информации: я разработчик AbacusUtil.

Я попробовал пользовательский ForkJoinPool следующим образом, чтобы настроить размер пула:

private static Set<String> ThreadNameSet = new HashSet<>();
private static Callable<Long> getSum() {
    List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList());
    return () -> aList.parallelStream()
            .peek((i) -> {
                String threadName = Thread.currentThread().getName();
                ThreadNameSet.add(threadName);
            })
            .reduce(0L, Long::sum);
}

private static void testForkJoinPool() {
    final int parallelism = 10;

    ForkJoinPool forkJoinPool = null;
    Long result = 0L;
    try {
        forkJoinPool = new ForkJoinPool(parallelism);
        result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown(); //always remember to shutdown the pool
        }
    }
    out.println(result);
    out.println(ThreadNameSet);
}

Вот вывод о том, что пул использует больше потоков, чем по умолчанию 4.

50000005000000
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

Но на самом деле есть чудак, когда я пытался добиться того же результата, используя ThreadPoolExecutor следующее:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000);
ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

но я потерпел неудачу.

Он только запустит параллельный поток в новом потоке, а затем все остальное будет таким же, что еще раз доказывает, что parallelStream будет использовать ForkJoinPool для запуска своих дочерних потоков.

Если вам не нужен пользовательский ThreadPool, но вы хотите ограничить количество одновременных задач, вы можете использовать:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList());
List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method

partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> {
       // do your processing   
}));

(Дубликат вопроса об этом заблокирован, поэтому, пожалуйста, несите меня сюда)

Вот как я программно установил флаг максимального количества потоков, упомянутый выше, и фрагмент кода, чтобы убедиться, что параметр соблюдается.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2");
Set<String> threadNames = Stream.iterate(0, n -> n + 1)
  .parallel()
  .limit(100000)
  .map(i -> Thread.currentThread().getName())
  .collect(Collectors.toSet());
System.out.println(threadNames);

// Output -> [ForkJoinPool.commonPool-worker-1, Test worker, ForkJoinPool.commonPool-worker-3]

Я сделал служебный метод для параллельного запуска задачи с аргументом, который определяет максимальное количество потоков.

      public static void runParallel(final int maxThreads, Runnable task) throws RuntimeException {
    ForkJoinPool forkJoinPool = null;
    try {
        forkJoinPool = new ForkJoinPool(maxThreads);
        forkJoinPool.submit(task).get();
    } catch (InterruptedException | ExecutionException e) {
        throw new RuntimeException(e);
    } finally {
        if (forkJoinPool != null) {
            forkJoinPool.shutdown();
        }
    }
}

Это создаетForkJoinPoolс максимальным количеством разрешенных потоков, и он закрывает его после завершения задачи (или сбоя).

Использование следующее:

      final int maxThreads = 4;
runParallel(maxThreads, () -> 
    IntStream.range(1, 1_000_000).parallel()
            .filter(PrimesPrint::isPrime)
            .boxed().collect(Collectors.toList()));

Если вы не возражаете против использования сторонней библиотеки, с помощью cyclops-реагировать вы можете смешивать последовательные и параллельные потоки в одном конвейере и предоставлять пользовательские ForkJoinPools. Например

 ReactiveSeq.range(1, 1_000_000)
            .foldParallel(new ForkJoinPool(10),
                          s->s.filter(i->true)
                              .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))
                              .max(Comparator.naturalOrder()));

Или, если мы хотим продолжить обработку в последовательном потоке

 ReactiveSeq.range(1, 1_000_000)
            .parallel(new ForkJoinPool(10),
                      s->s.filter(i->true)
                          .peek(i->System.out.println("Thread " + Thread.currentThread().getId())))
            .map(this::processSequentially)
            .forEach(System.out::println);

[Раскрытие Я ведущий разработчик циклоп-реакции]

Вы можете попробовать реализовать этот ForkJoinWorkerThreadFactory и внедрить его в класс Fork-Join.

public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    }

для этого вы можете использовать этот конструктор Fork-Join pool.

примечания:- 1. если вы используете это, примите во внимание, что в зависимости от вашей реализации новых потоков будет затронуто планирование из JVM, которое обычно распределяет потоки fork-join для разных ядер (рассматриваемых как вычислительный поток). 2. Планирование задач с помощью fork-join к потокам не пострадает. 3. Не совсем понял, как параллельный поток выбирает потоки из fork-join(не смог найти соответствующую документацию по нему), поэтому попробуйте использовать другую фабрику threadNaming, чтобы убедиться, что потоки в параллельном потоке выбираются из предоставленного вами customThreadFactory. 4. commonThreadPool не будет использовать этот customThreadFactory.

Другие вопросы по тегам