Должен ли параллелизм StreamEx работать при использовании takeWhile?
У меня есть поток, который я создаю так:
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.. // more stuff
Я могу изменить это, чтобы работать параллельно, просто добавив parallel
:
StreamEx.generate(new MySupplier<List<Entity>>())
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
Но я также хочу добавить takeWhile
условие остановки потока:
StreamEx.generate(new MySupplier<List<Entity>>())
.takeWhile(not(List::isEmpty))
.flatMap(List::stream)
.map(Entity::getName)
.map(name -> ...)
.parallel()
.. // more stuff
Но как только я добавлю takeWhile
кажется, что поток становится последовательным (по крайней мере, он обрабатывается только одним потоком). В соответствии с Javadoc takeWhile
, если я правильно понимаю, должны работать с параллельными потоками. Я что-то не так делаю или это по дизайну?
1 ответ
Как и в обычном Stream API, если что-то работает параллельно, это не значит, что оно работает эффективно. В документе говорится, что:
Хотя эта операция довольно дешевая для последовательного потока, она может быть довольно дорогой для параллельных конвейеров.
На самом деле вы хотите использовать takeWhile
с неупорядоченным потоком, который может быть специально оптимизирован, но в настоящее время не оптимизирован, так что это можно рассматривать как дефект. Я постараюсь это исправить (я автор StreamEx).
Обновление: исправлено в версии 0.6.5