StreamEx.parallel(). ForEach() не работает параллельно после.map()

Я заметил, что если я использую библиотеку StreamEx для распараллеливания моих потоков с пользовательским ForkJoinPool, как показано ниже - последующие действия выполняются в параллельных потоках из этого пула. Однако, если я добавлю операцию map () и параллельно полученному потоку - используется только один поток из пула.

Ниже приведен полный код (без импорта) минимального рабочего примера, демонстрирующего эту проблему. Единственная разница между методами executeAsParallelFromList () и executeAsParallelAfterMap () заключается в добавлении вызова.map (...) перед.parallel ().

import one.util.streamex.StreamEx;

public class ParallelExample {

private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);

public static List<String> getTestList(){
    int listSize = 10;
    List<String> testList = new ArrayList<>();
    for (int i=0; i<listSize; i++)
        testList.add("item_" + i);
    return testList;
}

public static void executeAsParallelFromList(){
    logger.info("executeAsParallelFromList():");
    List<String> testList = getTestList();
    StreamEx<String> streamOfItems = StreamEx
            .of(testList)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

public static void executeAsParallelAfterMap(){
    logger.info("executeAsParallelAfterMap():");
    List<String> testList = getTestList();
    StreamEx<String> streamOfItems = StreamEx
            .of(testList)
            .map(item -> item+"_mapped")
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

private static void handleItem(String item){
    // do something with the item - just print for now
    logger.info("I'm handling item: {}", item);
}

}

Модульный тест для выполнения обоих методов:

public class ParallelExampleTest {

@Test
public void testExecuteAsParallelFromList() {
    ParallelExample.executeAsParallelFromList();
}

@Test
public void testExecuteAsParallelFromStreamEx() {
    ParallelExample.executeAsParallelAfterMap();
}

}

Результаты исполнения:

08:49:12.992 [main] INFO  marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO  marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_7

08:49:13.043 [main] INFO  marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO  marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_9_mapped

Как вы видите, все три потока используются при выполнении executeAsParallelFromList (), но только один поток используется при выполнении executeAsParallelAfterMap ().

Зачем?

Спасибо!

Марина

ПРИМЕЧАНИЕ: пример намеренно упрощен - я постарался сделать его как можно меньше, чтобы продемонстрировать проблему. Очевидно, что в реальной жизни в map (), handleItem () и т. Д. Происходит гораздо больше, а входные данные гораздо интереснее (я пытаюсь параллельно обрабатывать сегменты / префиксы AWS S3).

2 ответа

Решение

Проблема в том, что как только вы позвоните map(...) Метод StreamEx создает базовый поток Java 8 с последовательной / параллельной конфигурацией, начиная с этой точки (то есть с последовательной), и вызывает parallel(...) после этого не появляется возможность обновления базового потока Java 8.

Решение зависит от того, чего вы пытаетесь достичь. Если ты счастлив за свой map(...) Операция будет выполняться параллельно, а затем просто переместите parallel(...) операция, так что это первое, что после of(...),

Однако, если вы хотите, чтобы некоторые операции выполнялись последовательно, перед некоторыми параллельными операциями, вам лучше использовать два потока. Например, следуя стилю вашего примера кода:

public static void executeAsParallelAfterMapV2() {
    logger.info("executeAsParallelAfterMapV2():");
    List<String> testList = getTestList();
    StreamEx<String> sequentialStream = StreamEx
            .of(testList)
            .map(item -> {
                logger.info("Mapping {}", item);
                return item + "_mapped";
            });
    logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());

    List<String> afterSequentialProcessing = sequentialStream.toList();
    StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

Это дает что-то вроде:

20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped

В сторону...

Из интереса, если вы создаете поток Java 8 напрямую (без использования StreamEx) и помещаете parallel() операция ниже map(...)затем он обновляет тип (целого) потока, чтобы он был параллельным:

public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
    logger.info("executeAsParallelAfterMapJava8Stream():");
    List<String> testList = getTestList();

    s3ThreadPool.submit(() -> {
        Stream<String> streamOfItems = testList.stream()
                .map(item -> {
                    logger.info("Mapping {}", item);
                    return item + "_mapped";
                })
                .parallel();
        logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
        streamOfItems.forEach(item -> handleItem(item));
    }).join();
}

Если вы создадите похожий модульный тест, вы получите нечто похожее на:

20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped

Простой ответ: это ошибка. Я подал и исправил это. Это было упущено тестами, поскольку тесты только проверяют, что все операции выполняются в указанном пуле, но не проверяют, используются ли разные потоки пула (иногда это нормально, если распараллеливание не работает, например, для потока только из одного элемента).

Исправление доступно в версии 0.6.4. В предыдущих выпусках для обхода проблемы вы могли бы рассмотреть возможность использования .parallel().parallel(fjp): это должно правильно распараллеливать.

Пожалуйста, рассмотрите возможность сообщения о проблемах StreamEx на официальном трекере проблем StreamEx. В эти дни я посещаю Stackru только изредка, поэтому могу пропустить проблемы, о которых здесь сообщалось.

Другие вопросы по тегам