Группировка StreamEx по спискам возвращает неверное количество записей

Следующий код разбивает поток объектов на куски по 1000, обрабатывает их при материализации и возвращает общее количество объектов в конце.

Во всех случаях возвращаемое число является правильным, если размер потока не равен 1. В случае, если размер потока равен 1, возвращаемое число равно 0.

Любая помощь будет принята с благодарностью. Мне также пришлось взломать обратный вызов в случае, если в потоке нет записей равных 0. Я бы тоже хотел это исправить.

AtomicInteger recordCounter = new AtomicInteger(0);
try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        stream.groupRuns((prev, next) -> recordCounter.incrementAndGet() % 1000 != 0)
              .forEach((chunk) ->
                      {
                          //... process each chunk
                      }
              );
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }

return recordCounter.get() == 0 ? 0 : recordCounter.incrementAndGet();

4 ответа

Решение

В конце я пошел с Iterators.partition() Гуавы, чтобы разделить мой поток объектов на куски:

MutableInt recordCounter = new MutableInt();
try {
    Iterators.partition(myObjects.iterator(), 1000)
             .forEachRemaining((chunk) -> {
                      //process each chunk
                      ...
                      recordCounter.add(chunk.size());
             });
} catch (Exception e) {
    throw new MyRuntimeException("Failure streaming...", e);
} finally {
    myObjects.close();
}

return recordCounter.getValue();

Как говорит JavaDoc:

sameGroup - невмешивающий предикат без сохранения состояния, применяемый к паре смежных элементов, который возвращает true для элементов, принадлежащих к той же группе.

Предикат должен быть без гражданства, а это не ваш случай. Вы неправильно используете метод, поэтому не можете получить ожидаемый результат. Он работает близко к тому, что вы хотите, чисто случайно, вы не можете полагаться на это поведение, оно может измениться в будущих версиях StreamEx.

@Nazarii Бардюк объяснил, почему это не работает. Я отвечаю аналогичным требованиям, чтобы разделить поток раньше. Поэтому я разобрался и внес несколько изменений в: StreamEx-0.8.7. Вот простой пример:

int count = IntStreamEx.range(0, 10).boxed().splitToList(3).mapToInt(chunk -> {
    System.out.println(chunk);
    return chunk.size();
}).sum();

System.out.println(count);

Если вы находитесь в начале вашего проекта, вы можете попробовать, и код будет:

try (StreamEx<MyObject> stream = StreamEx.of(myObjects).onClose(() -> myObjects.close())) {
    return stream.splitToList(1000)
                 .mapToInt((chunk) -> {
                              //... process each chunk
                     return chunk.size();
                  }).sum();
}

Изначально счетчик использовался, чтобы знать, когда следует разбивать порции, и подсчет общего количества объектов ненадежен. Когда поток имеет размер 0 или 1 groupRuns функция не выполнена.

Поэтому вам нужен другой способ подсчета объектов. Вместо того, чтобы просто потреблять предметы в forEach вы можете вернуть количество обработанных объектов chunk.size() а также sum их в конце

    AtomicInteger counter = new AtomicInteger(0);
    try (StreamEx<MyObject> stream = StreamEx.of(myObjects)) {
        return stream
                .groupRuns((prev, next) -> counter.incrementAndGet() % 1000 != 0)
                .mapToLong((chunk) -> {
                     //... process each chunk
                     return chunk.size();
                 })
                .sum();
    } catch(Exception e) {
        throw new MyRuntimeException("Failure streaming...", e);
    } finally {
        myObjects.close();
    }
Другие вопросы по тегам