Должен ли параллелизм StreamEx работать при использовании takeWhile?

У меня есть поток, который я создаю так:

StreamEx.generate(new MySupplier<List<Entity>>())
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .. // more stuff

Я могу изменить это, чтобы работать параллельно, просто добавив parallel:

StreamEx.generate(new MySupplier<List<Entity>>())
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .parallel()
        .. // more stuff

Но я также хочу добавить takeWhile условие остановки потока:

StreamEx.generate(new MySupplier<List<Entity>>())
        .takeWhile(not(List::isEmpty))
        .flatMap(List::stream)
        .map(Entity::getName)
        .map(name -> ...)
        .parallel()
        .. // more stuff

Но как только я добавлю takeWhile кажется, что поток становится последовательным (по крайней мере, он обрабатывается только одним потоком). В соответствии с Javadoc takeWhile, если я правильно понимаю, должны работать с параллельными потоками. Я что-то не так делаю или это по дизайну?

1 ответ

Решение

Как и в обычном Stream API, если что-то работает параллельно, это не значит, что оно работает эффективно. В документе говорится, что:

Хотя эта операция довольно дешевая для последовательного потока, она может быть довольно дорогой для параллельных конвейеров.

На самом деле вы хотите использовать takeWhile с неупорядоченным потоком, который может быть специально оптимизирован, но в настоящее время не оптимизирован, так что это можно рассматривать как дефект. Я постараюсь это исправить (я автор StreamEx).

Обновление: исправлено в версии 0.6.5

Другие вопросы по тегам