StreamEx.parallel(). ForEach() не работает параллельно после.map()
Я заметил, что если я использую библиотеку StreamEx для распараллеливания моих потоков с пользовательским ForkJoinPool, как показано ниже - последующие действия выполняются в параллельных потоках из этого пула. Однако, если я добавлю операцию map () и параллельно полученному потоку - используется только один поток из пула.
Ниже приведен полный код (без импорта) минимального рабочего примера, демонстрирующего эту проблему. Единственная разница между методами executeAsParallelFromList () и executeAsParallelAfterMap () заключается в добавлении вызова.map (...) перед.parallel ().
import one.util.streamex.StreamEx;
public class ParallelExample {
private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);
public static List<String> getTestList(){
int listSize = 10;
List<String> testList = new ArrayList<>();
for (int i=0; i<listSize; i++)
testList.add("item_" + i);
return testList;
}
public static void executeAsParallelFromList(){
logger.info("executeAsParallelFromList():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
public static void executeAsParallelAfterMap(){
logger.info("executeAsParallelAfterMap():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.map(item -> item+"_mapped")
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
private static void handleItem(String item){
// do something with the item - just print for now
logger.info("I'm handling item: {}", item);
}
}
Модульный тест для выполнения обоих методов:
public class ParallelExampleTest {
@Test
public void testExecuteAsParallelFromList() {
ParallelExample.executeAsParallelFromList();
}
@Test
public void testExecuteAsParallelFromStreamEx() {
ParallelExample.executeAsParallelAfterMap();
}
}
Результаты исполнения:
08:49:12.992 [main] INFO marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO marina.streams.ParallelExample - I'm handling item: item_7
08:49:13.043 [main] INFO marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO marina.streams.ParallelExample - I'm handling item: item_9_mapped
Как вы видите, все три потока используются при выполнении executeAsParallelFromList (), но только один поток используется при выполнении executeAsParallelAfterMap ().
Зачем?
Спасибо!
Марина
ПРИМЕЧАНИЕ: пример намеренно упрощен - я постарался сделать его как можно меньше, чтобы продемонстрировать проблему. Очевидно, что в реальной жизни в map (), handleItem () и т. Д. Происходит гораздо больше, а входные данные гораздо интереснее (я пытаюсь параллельно обрабатывать сегменты / префиксы AWS S3).
2 ответа
Проблема в том, что как только вы позвоните map(...)
Метод StreamEx создает базовый поток Java 8 с последовательной / параллельной конфигурацией, начиная с этой точки (то есть с последовательной), и вызывает parallel(...)
после этого не появляется возможность обновления базового потока Java 8.
Решение зависит от того, чего вы пытаетесь достичь. Если ты счастлив за свой map(...)
Операция будет выполняться параллельно, а затем просто переместите parallel(...)
операция, так что это первое, что после of(...)
,
Однако, если вы хотите, чтобы некоторые операции выполнялись последовательно, перед некоторыми параллельными операциями, вам лучше использовать два потока. Например, следуя стилю вашего примера кода:
public static void executeAsParallelAfterMapV2() {
logger.info("executeAsParallelAfterMapV2():");
List<String> testList = getTestList();
StreamEx<String> sequentialStream = StreamEx
.of(testList)
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
});
logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());
List<String> afterSequentialProcessing = sequentialStream.toList();
StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
Это дает что-то вроде:
20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
В сторону...
Из интереса, если вы создаете поток Java 8 напрямую (без использования StreamEx) и помещаете parallel()
операция ниже map(...)
затем он обновляет тип (целого) потока, чтобы он был параллельным:
public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
logger.info("executeAsParallelAfterMapJava8Stream():");
List<String> testList = getTestList();
s3ThreadPool.submit(() -> {
Stream<String> streamOfItems = testList.stream()
.map(item -> {
logger.info("Mapping {}", item);
return item + "_mapped";
})
.parallel();
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}).join();
}
Если вы создадите похожий модульный тест, вы получите нечто похожее на:
20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
Простой ответ: это ошибка. Я подал и исправил это. Это было упущено тестами, поскольку тесты только проверяют, что все операции выполняются в указанном пуле, но не проверяют, используются ли разные потоки пула (иногда это нормально, если распараллеливание не работает, например, для потока только из одного элемента).
Исправление доступно в версии 0.6.4. В предыдущих выпусках для обхода проблемы вы могли бы рассмотреть возможность использования .parallel().parallel(fjp)
: это должно правильно распараллеливать.
Пожалуйста, рассмотрите возможность сообщения о проблемах StreamEx на официальном трекере проблем StreamEx. В эти дни я посещаю Stackru только изредка, поэтому могу пропустить проблемы, о которых здесь сообщалось.