Это ошибка в Files.lines(), или я что-то неправильно понимаю о параллельных потоках?
Среда: Ubuntu x86_64 (14.10), Oracle JDK 1.8u25
Я пытаюсь использовать параллельный поток Files.lines()
но я хочу .skip()
первая строка (это файл CSV с заголовком). Поэтому я пытаюсь сделать это:
try (
final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
.skip(1L).parallel();
) {
// etc
}
Но тогда один столбец не смог разобрать int...
Поэтому я попробовал простой код. Файл вопрос очень прост:
$ cat info.csv
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$
И код одинаково прост:
public static void main(final String... args)
{
final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
.forEach(System.out::println);
}
И я систематически получаю следующий результат (хорошо, я только что-то запускал около 20 раз):
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
Что мне здесь не хватает?
РЕДАКТИРОВАТЬ Кажется, что проблема, или недоразумение, гораздо более коренятся, чем это (два приведенных ниже примера были подготовлены сотрудником FreeNode ##java):
public static void main(final String... args)
{
new BufferedReader(new StringReader("Hello\nWorld")).lines()
.skip(1L).parallel()
.forEach(System.out::println);
final Iterator<String> iter
= Arrays.asList("Hello", "World").iterator();
final Spliterator<String> spliterator
= Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
final Stream<String> s
= StreamSupport.stream(spliterator, true);
s.skip(1L).forEach(System.out::println);
}
Это печатает:
Hello
Hello
Э-э.
@Holger предположил, что это происходит для любого потока, который ORDERED
и не SIZED
с этим другим примером:
Stream.of("Hello", "World")
.filter(x -> true)
.parallel()
.skip(1L)
.forEach(System.out::println);
Кроме того, это вытекает из всей дискуссии, которая уже состоялась, что проблема (если она одна?) С .forEach()
(как впервые указал @SotiriosDelimanolis).
5 ответов
Поскольку текущее состояние вопроса совершенно противоположно предыдущим заявлениям, сделанным здесь, следует отметить, что в настоящее время Брайан Гетц высказал явное утверждение о обратном распространении неупорядоченной характеристики за skip
операция считается ошибкой. Также утверждается, что теперь считается, что он вообще не имеет обратного распространения упорядоченности терминальной операции.
Также имеется отчет об ошибке JDK-8129120, статус которого "фиксирован в Java 9" и перенесён в Java 8, обновление 60
Я сделал несколько тестов с jdk1.8.0_60
и кажется, что реализация сейчас действительно демонстрирует более интуитивное поведение.
ЭТО ОТВЕТ УСТАРЕЛ - ПРОЧИТАЙТЕ ЭТО ОДИН РАЗ!
Чтобы быстро ответить на вопрос: наблюдаемое поведение предназначено! Там нет ошибки, и все происходит в соответствии с документацией. Но позвольте сказать, что это поведение должно быть задокументировано и передано лучше. Следует сделать более очевидным, как forEach
игнорирует порядок.
Сначала я расскажу о концепциях, которые допускают наблюдаемое поведение. Это обеспечивает основу для анализа одного из примеров, приведенных в вопросе. Я сделаю это на высоком уровне, а затем снова на очень низком уровне.
[TL; DR: читайте сами, объяснение высокого уровня даст грубый ответ.]
концепция
Вместо того чтобы говорить о Stream
s, который является типом, используемым или возвращаемым потоковыми методами, давайте поговорим о потоковых операциях и потоковых конвейерах. Вызов метода lines
, skip
а также parallel
являются потоковыми операциями, которые строят потоковый конвейер [1] и - как уже отмечали другие - этот конвейер обрабатывается как единое целое при работе терминала forEach
называется [2].
Конвейер может рассматриваться как последовательность операций, которые одна за другой выполняются над всем потоком (например, фильтровать все элементы, отображать оставшиеся элементы в числа, суммировать все числа). Но это вводит в заблуждение! Лучшей метафорой является то, что терминальная операция протягивает отдельные элементы через каждую операцию [3] (например, получить следующий нефильтрованный элемент, отобразить его, добавить его к сумме, запросить следующий элемент). Некоторые промежуточные операции, возможно, должны пройти несколько (например, skip
или, может быть, даже все (например, sort
) элементов, прежде чем они смогут вернуть запрошенный следующий элемент, и это один из источников для состояния в операции.
Каждая операция сигнализирует свои характеристики с этими StreamOpFlag
s:
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
Они объединяются в источнике потока, промежуточных операциях и работе терминала и образуют характеристики конвейера (в целом), которые затем используются для оптимизации [4]. Точно так же, выполняется ли конвейер параллельно или нет, является свойством всего конвейера [5].
Поэтому, когда вы делаете предположения относительно этих характеристик, вы должны внимательно следить за всеми операциями построения конвейера, независимо от того, в каком порядке они применяются, и какие гарантии они дают. При этом имейте в виду, как операция терминала протягивает каждый отдельный элемент через конвейер.
пример
Давайте посмотрим на этот особый случай:
BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
.skip(1L)
.parallel()
.forEach(System.out::println);
Высокий уровень
Независимо от того, упорядочен ли ваш поток источника или нет (он есть), вызывая forEach
(вместо forEachOrdered
) вы заявляете, что порядок не имеет значения для вас [6], что эффективно снижает skip
от "пропустить первые n элементов" до "пропустить любые n элементов"[7] (потому что без порядка первое становится бессмысленным).
Таким образом, вы даете конвейеру право игнорировать порядок, если это обещает ускорение. Для параллельного выполнения это, по-видимому, так, поэтому вы получаете наблюдаемый результат. Следовательно, то, что вы наблюдаете, - это предполагаемое поведение, а не ошибка.
Обратите внимание, что это не противоречит skip
быть состоящим Как описано выше, состояние с сохранением состояния не означает, что он каким-то образом кэширует весь поток (за исключением пропущенных элементов), и все последующее выполняется на этих элементах. Это просто означает, что операция имеет какое-то состояние, а именно количество пропущенных элементов (ну, на самом деле это не так просто, но с моим ограниченным пониманием того, что происходит, я бы сказал, что это справедливое упрощение).
Низкий уровень
Давайте посмотрим на это более подробно:
BufferedReader.lines
создаетStream
Давайте назовем это_lines
:- создает заказанный
Spliterator
- руки это
StreamSupport.stream
, который создаетReferencePipeline.Head
и преобразует флаг сплитератора в флаг операции потока
- создает заказанный
.skip
создает новыйStream
давайте назовем это_skip
:- звонки
ReferencePipeline.skip
- которая создает операцию "среза" (обобщение пропуска и лимита) с
SliceOps.makeRef
- это создает анонимный экземпляр
ReferencePipeline.StatefulOp
какие ссылки_lines
как его источник
- звонки
.parallel
устанавливает флаг параллели для всего конвейера, как описано выше.forEach
на самом деле начинает выполнение
Итак, давайте посмотрим, как выполняется конвейер:
- призвание
_skip.forEach
создаетForEachOp
(давайте назовем это_forEach
) и передает его_skip.evaluate
, который делает две вещи:- звонки
sourceSpliterator
создать сплитератор вокруг источника для этой стадии конвейера:- звонки
opEvaluateParallelLazy
на себя (как выясняется) - это определяет, что поток неупорядочен и создает
UnorderedSliceSpliterator
(давайте назовем это_sliceSpliterator
) сskip = 1
и нет предела.
- звонки
- звонки
_forEach.evaluateParallel
который создаетForEachTask
(потому что это неупорядоченный; давайте назовем это_forEachTask
) и вызывает его
- звонки
- В
_forEachTask.compute
задача разделяет первые 1024 строки, создает для нее новую задачу (назовем ее_forEachTask2
), понимает, что не осталось ни одной строки и заканчивает. - Внутри бассейна вилки,
_forEachTask2.compute
вызывается, тщетно пытается снова разделиться и, наконец, начинает копировать свои элементы в приемник (обертка с поддержкой потока вокругSystem.out.println
) позвонив_skip.copyInto
, - Это по существу делегирует задачу указанному сплитератору. Это
_sliceSpliterator
который был создан выше! Так_sliceSpliterator.forEachRemaining
отвечает за передачу не пропущенных элементов в println-sink:- он получает часть (в данном случае все) строк в буфер и считает их
- он пытается запросить столько разрешений (я полагаю, из-за распараллеливания) через
acquirePermits
- с двумя элементами в источнике и одним, который нужно пропустить, есть только одно разрешение, которое он получает (в общем, скажем, n)
- он позволяет буферу помещать в приемник первые n элементов (в данном случае только первый)
Так UnorderedSliceSpliterator.OfRef.forEachRemaining
где заказ окончательно и действительно игнорируется. Я не сравнивал это с заказанным вариантом, но это мое предположение, почему это делается так:
- при распараллеливании засовывание элементов сплитератора в буфер может чередоваться с другими задачами, выполняющими то же самое
- это сделает отслеживание их заказа чрезвычайно трудным
- делать это или предотвращать чередование снижает производительность и бессмысленно, если порядок не имеет значения
- если порядок потерян, то ничего другого не остается, кроме как обработать первые n разрешенных элементов
Любые вопросы?;) Извините, что так долго. Возможно я должен пропустить детали и сделать сообщение в блоге об этом....
источники
[1] java.util.stream
- Потоковые операции и трубопроводы:
Потоковые операции делятся на промежуточные и терминальные операции и объединяются в потоковые конвейеры.
[2] java.util.stream
- Потоковые операции и трубопроводы:
Обход источника конвейера не начинается, пока не будет выполнена терминальная операция конвейера.
[3] Эта метафора представляет мое понимание потоков. Основным источником, помимо кода, является эта цитата из java.util.stream
- Поток операций и трубопроводов (выделение шахты):
Обработка потоков лениво обеспечивает значительную эффективность; в конвейере, таком как приведенный выше пример filter-map-sum, фильтрация, отображение и суммирование могут быть объединены в один проход данных с минимальным промежуточным состоянием. Лень также позволяет избежать проверки всех данных, когда в этом нет необходимости; для таких операций, как "найти первую строку длиной более 1000 символов", необходимо только изучить достаточно строк, чтобы найти ту, которая обладает желаемыми характеристиками, без проверки всех строк, доступных из источника.
[4] java.util.stream.StreamOpFlag
:
На каждом этапе конвейера можно вычислить объединенные флаги потока и операций [... jadda, jadda, jadda о том, как флаги объединяются в исходных, промежуточных и терминальных операциях...] для получения выходных флагов из конвейера. Эти флаги могут затем использоваться для применения оптимизаций.
В коде вы можете увидеть это в AbstractPipeline.combinedFlags
, который устанавливается во время построения (и в нескольких других случаях) путем объединения флага предыдущей и новой операции.
[5] java.util.stream
- Параллелизм (с которым я не могу напрямую связать - прокрутите немного вниз):
Когда операция терминала инициируется, конвейер потока выполняется последовательно или параллельно в зависимости от ориентации потока, в котором он вызывается.
В коде вы можете увидеть это в AbstractPipeline.sequential
, parallel
, а также isParallel
, который устанавливает / проверяет логический флаг в источнике потока, делая его неактуальным при вызове сеттеров при создании потока.
[6] java.util.stream.Stream.forEach:
Выполняет действие для каждого элемента этого потока. [...] Поведение этой операции явно недетерминировано.
Сравните это с java.util.stream.Stream.forEachOrdered:
Выполняет действие для каждого элемента этого потока в порядке встречи потока, если поток имеет определенный порядок встречи.
[7] Это также не ясно задокументировано, но моя интерпретация этого комментария Stream.skip
(сильно сокращен мной):
[...] skip() [...] может быть довольно дорогим для упорядоченных параллельных конвейеров [...], так как skip (n) ограничен пропуском не только n элементов, но и первых n элементов в порядке встречи, [...] [R] удаление ограничения порядка [...] может привести к значительному ускорению skip() в параллельных конвейерах
Проблема в том, что вы используете параллельный поток вместе с forEach и ожидаете, что действие пропуска зависит от правильного порядка элементов, что здесь не так. Выдержка из документации forEach:
Для параллельных потоковых конвейеров эта операция не гарантирует соблюдение порядка встречи потока, так как это принесет ущерб преимуществам параллелизма.
Я предполагаю, что в основном происходит то, что операция пропуска сначала выполняется во второй строке, а не в первой. Если вы сделаете поток последовательным или используете forEachOrdered, вы увидите, что тогда он даст ожидаемый результат. Другой подход заключается в использовании коллекторов.
Позвольте мне процитировать что-то актуальное - Javadoc of skip
:
Хотя skip() обычно является дешевой операцией для последовательных потоковых конвейеров, она может быть довольно дорогой для упорядоченных параллельных конвейеров, особенно для больших значений n, поскольку skip(n) ограничено пропуском не только n элементов, но и первых n элементы в порядке встречи.
Теперь, вполне уверен, что Files.lines()
имеет четко определенный порядок встреч и является ORDERED
stream (если бы не было, даже в последовательной операции не было бы никакой гарантии, что порядок встречи соответствует порядку файла), поэтому гарантируется, что результирующий поток будет детерминистически состоять только из второй строки в вашем примере.
Независимо от того, есть ли что-то еще, гарантия, безусловно, есть.
У меня есть идея, как обойти эту проблему, которую я не вижу в предыдущих обсуждениях. Вы можете воссоздать поток, разделив конвейер на два конвейера, сохраняя ленивость целиком.
public static <T> Stream<T> recreate(Stream<T> stream) {
return StreamSupport.stream(stream.spliterator(), stream.isParallel())
.onClose(stream::close);
}
public static void main(String[] args) {
recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines()
.skip(1).parallel()).forEach(System.out::println);
}
Когда вы воссоздаете поток из начального потокового сплитератора, вы фактически создаете новый конвейер. В большинстве случаев recreate
будет работать как no-op
Но дело в том, что первый и второй конвейеры не разделяют parallel
а также unordered
состояния. Так что даже если вы используете forEach
(или любая другая неупорядоченная операция терминала), только второй поток становится неупорядоченным.
Внутренне довольно похожая вещь объединяет ваш поток с пустым потоком:
Stream.concat(Stream.empty(),
new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5"))
.lines().skip(1).parallel()).forEach(System.out::println);
Хотя это немного больше накладных расходов.