Это ошибка в Files.lines(), или я что-то неправильно понимаю о параллельных потоках?

Среда: Ubuntu x86_64 (14.10), Oracle JDK 1.8u25

Я пытаюсь использовать параллельный поток Files.lines() но я хочу .skip() первая строка (это файл CSV с заголовком). Поэтому я пытаюсь сделать это:

try (
    final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
        .skip(1L).parallel();
) {
    // etc
}

Но тогда один столбец не смог разобрать int...

Поэтому я попробовал простой код. Файл вопрос очень прост:

$ cat info.csv 
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$

И код одинаково прост:

public static void main(final String... args)
{
    final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
    Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
        .forEach(System.out::println);
}

И я систематически получаю следующий результат (хорошо, я только что-то запускал около 20 раз):

startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes

Что мне здесь не хватает?


РЕДАКТИРОВАТЬ Кажется, что проблема, или недоразумение, гораздо более коренятся, чем это (два приведенных ниже примера были подготовлены сотрудником FreeNode ##java):

public static void main(final String... args)
{
    new BufferedReader(new StringReader("Hello\nWorld")).lines()
        .skip(1L).parallel()
        .forEach(System.out::println);

    final Iterator<String> iter
        = Arrays.asList("Hello", "World").iterator();
    final Spliterator<String> spliterator
        = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
    final Stream<String> s
        = StreamSupport.stream(spliterator, true);

    s.skip(1L).forEach(System.out::println);
}

Это печатает:

Hello
Hello

Э-э.

@Holger предположил, что это происходит для любого потока, который ORDERED и не SIZED с этим другим примером:

Stream.of("Hello", "World")
    .filter(x -> true)
    .parallel()
    .skip(1L)
    .forEach(System.out::println);

Кроме того, это вытекает из всей дискуссии, которая уже состоялась, что проблема (если она одна?) С .forEach() (как впервые указал @SotiriosDelimanolis).

5 ответов

Решение

Поскольку текущее состояние вопроса совершенно противоположно предыдущим заявлениям, сделанным здесь, следует отметить, что в настоящее время Брайан Гетц высказал явное утверждение о обратном распространении неупорядоченной характеристики за skip операция считается ошибкой. Также утверждается, что теперь считается, что он вообще не имеет обратного распространения упорядоченности терминальной операции.

Также имеется отчет об ошибке JDK-8129120, статус которого "фиксирован в Java 9" и перенесён в Java 8, обновление 60

Я сделал несколько тестов с jdk1.8.0_60 и кажется, что реализация сейчас действительно демонстрирует более интуитивное поведение.

ЭТО ОТВЕТ УСТАРЕЛ - ПРОЧИТАЙТЕ ЭТО ОДИН РАЗ!


Чтобы быстро ответить на вопрос: наблюдаемое поведение предназначено! Там нет ошибки, и все происходит в соответствии с документацией. Но позвольте сказать, что это поведение должно быть задокументировано и передано лучше. Следует сделать более очевидным, как forEach игнорирует порядок.

Сначала я расскажу о концепциях, которые допускают наблюдаемое поведение. Это обеспечивает основу для анализа одного из примеров, приведенных в вопросе. Я сделаю это на высоком уровне, а затем снова на очень низком уровне.

[TL; DR: читайте сами, объяснение высокого уровня даст грубый ответ.]

концепция

Вместо того чтобы говорить о Stream s, который является типом, используемым или возвращаемым потоковыми методами, давайте поговорим о потоковых операциях и потоковых конвейерах. Вызов метода lines, skip а также parallel являются потоковыми операциями, которые строят потоковый конвейер [1] и - как уже отмечали другие - этот конвейер обрабатывается как единое целое при работе терминала forEach называется [2].

Конвейер может рассматриваться как последовательность операций, которые одна за другой выполняются над всем потоком (например, фильтровать все элементы, отображать оставшиеся элементы в числа, суммировать все числа). Но это вводит в заблуждение! Лучшей метафорой является то, что терминальная операция протягивает отдельные элементы через каждую операцию [3] (например, получить следующий нефильтрованный элемент, отобразить его, добавить его к сумме, запросить следующий элемент). Некоторые промежуточные операции, возможно, должны пройти несколько (например, skip или, может быть, даже все (например, sort) элементов, прежде чем они смогут вернуть запрошенный следующий элемент, и это один из источников для состояния в операции.

Каждая операция сигнализирует свои характеристики с этими StreamOpFlag s:

  • DISTINCT
  • SORTED
  • ORDERED
  • SIZED
  • SHORT_CIRCUIT

Они объединяются в источнике потока, промежуточных операциях и работе терминала и образуют характеристики конвейера (в целом), которые затем используются для оптимизации [4]. Точно так же, выполняется ли конвейер параллельно или нет, является свойством всего конвейера [5].

Поэтому, когда вы делаете предположения относительно этих характеристик, вы должны внимательно следить за всеми операциями построения конвейера, независимо от того, в каком порядке они применяются, и какие гарантии они дают. При этом имейте в виду, как операция терминала протягивает каждый отдельный элемент через конвейер.

пример

Давайте посмотрим на этот особый случай:

BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
        .skip(1L)
        .parallel()
        .forEach(System.out::println);

Высокий уровень

Независимо от того, упорядочен ли ваш поток источника или нет (он есть), вызывая forEach (вместо forEachOrdered) вы заявляете, что порядок не имеет значения для вас [6], что эффективно снижает skip от "пропустить первые n элементов" до "пропустить любые n элементов"[7] (потому что без порядка первое становится бессмысленным).

Таким образом, вы даете конвейеру право игнорировать порядок, если это обещает ускорение. Для параллельного выполнения это, по-видимому, так, поэтому вы получаете наблюдаемый результат. Следовательно, то, что вы наблюдаете, - это предполагаемое поведение, а не ошибка.

Обратите внимание, что это не противоречит skip быть состоящим Как описано выше, состояние с сохранением состояния не означает, что он каким-то образом кэширует весь поток (за исключением пропущенных элементов), и все последующее выполняется на этих элементах. Это просто означает, что операция имеет какое-то состояние, а именно количество пропущенных элементов (ну, на самом деле это не так просто, но с моим ограниченным пониманием того, что происходит, я бы сказал, что это справедливое упрощение).

Низкий уровень

Давайте посмотрим на это более подробно:

  1. BufferedReader.lines создает Stream Давайте назовем это _lines:
    • создает заказанный Spliterator
    • руки это StreamSupport.stream, который создает ReferencePipeline.Head и преобразует флаг сплитератора в флаг операции потока
  2. .skip создает новый Stream давайте назовем это _skip:
    • звонки ReferencePipeline.skip
    • которая создает операцию "среза" (обобщение пропуска и лимита) с SliceOps.makeRef
    • это создает анонимный экземпляр ReferencePipeline.StatefulOp какие ссылки _lines как его источник
  3. .parallel устанавливает флаг параллели для всего конвейера, как описано выше
  4. .forEach на самом деле начинает выполнение

Итак, давайте посмотрим, как выполняется конвейер:

  1. призвание _skip.forEach создает ForEachOp (давайте назовем это _forEach) и передает его _skip.evaluate, который делает две вещи:
    1. звонки sourceSpliterator создать сплитератор вокруг источника для этой стадии конвейера:
      • звонки opEvaluateParallelLazy на себя (как выясняется)
      • это определяет, что поток неупорядочен и создает UnorderedSliceSpliterator (давайте назовем это _sliceSpliterator) с skip = 1 и нет предела.
    2. звонки _forEach.evaluateParallelкоторый создает ForEachTask (потому что это неупорядоченный; давайте назовем это _forEachTask) и вызывает его
  2. В _forEachTask.compute задача разделяет первые 1024 строки, создает для нее новую задачу (назовем ее _forEachTask2), понимает, что не осталось ни одной строки и заканчивает.
  3. Внутри бассейна вилки, _forEachTask2.compute вызывается, тщетно пытается снова разделиться и, наконец, начинает копировать свои элементы в приемник (обертка с поддержкой потока вокруг System.out.println) позвонив _skip.copyInto,
  4. Это по существу делегирует задачу указанному сплитератору. Это _sliceSpliterator который был создан выше! Так _sliceSpliterator.forEachRemaining отвечает за передачу не пропущенных элементов в println-sink:
    1. он получает часть (в данном случае все) строк в буфер и считает их
    2. он пытается запросить столько разрешений (я полагаю, из-за распараллеливания) через acquirePermits
    3. с двумя элементами в источнике и одним, который нужно пропустить, есть только одно разрешение, которое он получает (в общем, скажем, n)
    4. он позволяет буферу помещать в приемник первые n элементов (в данном случае только первый)

Так UnorderedSliceSpliterator.OfRef.forEachRemaining где заказ окончательно и действительно игнорируется. Я не сравнивал это с заказанным вариантом, но это мое предположение, почему это делается так:

  • при распараллеливании засовывание элементов сплитератора в буфер может чередоваться с другими задачами, выполняющими то же самое
  • это сделает отслеживание их заказа чрезвычайно трудным
  • делать это или предотвращать чередование снижает производительность и бессмысленно, если порядок не имеет значения
  • если порядок потерян, то ничего другого не остается, кроме как обработать первые n разрешенных элементов

Любые вопросы?;) Извините, что так долго. Возможно я должен пропустить детали и сделать сообщение в блоге об этом....

источники

[1] java.util.stream - Потоковые операции и трубопроводы:

Потоковые операции делятся на промежуточные и терминальные операции и объединяются в потоковые конвейеры.

[2] java.util.stream - Потоковые операции и трубопроводы:

Обход источника конвейера не начинается, пока не будет выполнена терминальная операция конвейера.

[3] Эта метафора представляет мое понимание потоков. Основным источником, помимо кода, является эта цитата из java.util.stream - Поток операций и трубопроводов (выделение шахты):

Обработка потоков лениво обеспечивает значительную эффективность; в конвейере, таком как приведенный выше пример filter-map-sum, фильтрация, отображение и суммирование могут быть объединены в один проход данных с минимальным промежуточным состоянием. Лень также позволяет избежать проверки всех данных, когда в этом нет необходимости; для таких операций, как "найти первую строку длиной более 1000 символов", необходимо только изучить достаточно строк, чтобы найти ту, которая обладает желаемыми характеристиками, без проверки всех строк, доступных из источника.

[4] java.util.stream.StreamOpFlag:

На каждом этапе конвейера можно вычислить объединенные флаги потока и операций [... jadda, jadda, jadda о том, как флаги объединяются в исходных, промежуточных и терминальных операциях...] для получения выходных флагов из конвейера. Эти флаги могут затем использоваться для применения оптимизаций.

В коде вы можете увидеть это в AbstractPipeline.combinedFlags, который устанавливается во время построения (и в нескольких других случаях) путем объединения флага предыдущей и новой операции.

[5] java.util.stream - Параллелизм (с которым я не могу напрямую связать - прокрутите немного вниз):

Когда операция терминала инициируется, конвейер потока выполняется последовательно или параллельно в зависимости от ориентации потока, в котором он вызывается.

В коде вы можете увидеть это в AbstractPipeline.sequential , parallel, а также isParallel, который устанавливает / проверяет логический флаг в источнике потока, делая его неактуальным при вызове сеттеров при создании потока.

[6] java.util.stream.Stream.forEach:

Выполняет действие для каждого элемента этого потока. [...] Поведение этой операции явно недетерминировано.

Сравните это с java.util.stream.Stream.forEachOrdered:

Выполняет действие для каждого элемента этого потока в порядке встречи потока, если поток имеет определенный порядок встречи.

[7] Это также не ясно задокументировано, но моя интерпретация этого комментария Stream.skip (сильно сокращен мной):

[...] skip() [...] может быть довольно дорогим для упорядоченных параллельных конвейеров [...], так как skip (n) ограничен пропуском не только n элементов, но и первых n элементов в порядке встречи, [...] [R] удаление ограничения порядка [...] может привести к значительному ускорению skip() в параллельных конвейерах

Проблема в том, что вы используете параллельный поток вместе с forEach и ожидаете, что действие пропуска зависит от правильного порядка элементов, что здесь не так. Выдержка из документации forEach:

Для параллельных потоковых конвейеров эта операция не гарантирует соблюдение порядка встречи потока, так как это принесет ущерб преимуществам параллелизма.

Я предполагаю, что в основном происходит то, что операция пропуска сначала выполняется во второй строке, а не в первой. Если вы сделаете поток последовательным или используете forEachOrdered, вы увидите, что тогда он даст ожидаемый результат. Другой подход заключается в использовании коллекторов.

Позвольте мне процитировать что-то актуальное - Javadoc of skip:

Хотя skip() обычно является дешевой операцией для последовательных потоковых конвейеров, она может быть довольно дорогой для упорядоченных параллельных конвейеров, особенно для больших значений n, поскольку skip(n) ограничено пропуском не только n элементов, но и первых n элементы в порядке встречи.

Теперь, вполне уверен, что Files.lines() имеет четко определенный порядок встреч и является ORDERED stream (если бы не было, даже в последовательной операции не было бы никакой гарантии, что порядок встречи соответствует порядку файла), поэтому гарантируется, что результирующий поток будет детерминистически состоять только из второй строки в вашем примере.

Независимо от того, есть ли что-то еще, гарантия, безусловно, есть.

У меня есть идея, как обойти эту проблему, которую я не вижу в предыдущих обсуждениях. Вы можете воссоздать поток, разделив конвейер на два конвейера, сохраняя ленивость целиком.

public static <T> Stream<T> recreate(Stream<T> stream) {
    return StreamSupport.stream(stream.spliterator(), stream.isParallel())
                        .onClose(stream::close);
}

public static void main(String[] args) {
    recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines()
        .skip(1).parallel()).forEach(System.out::println);
}

Когда вы воссоздаете поток из начального потокового сплитератора, вы фактически создаете новый конвейер. В большинстве случаев recreate будет работать как no-opНо дело в том, что первый и второй конвейеры не разделяют parallel а также unordered состояния. Так что даже если вы используете forEach (или любая другая неупорядоченная операция терминала), только второй поток становится неупорядоченным.

Внутренне довольно похожая вещь объединяет ваш поток с пустым потоком:

Stream.concat(Stream.empty(), 
    new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5"))
          .lines().skip(1).parallel()).forEach(System.out::println);

Хотя это немного больше накладных расходов.

Другие вопросы по тегам